diff --git a/.travis.yml b/.travis.yml index 199dc029..dc6c39cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,8 +10,12 @@ addons: - libpython3.4-dev - python3-numpy - swig + - libmpich-dev + - mpich install: - pip install -r requirements.txt +- pip install -r requirements/backend-mpi.txt +- pip install -r requirements/backend-spark.txt script: - make test deploy: diff --git a/Makefile b/Makefile index cccd3d66..c2aed790 100644 --- a/Makefile +++ b/Makefile @@ -4,10 +4,10 @@ MAKEDIRS=$(shell find examples -name Makefile -exec dirname {} \;) whl_file = abcpy-${VERSION}-py3-none-any.whl .DEFAULT: help -.PHONY: help clean doc doctest exampletest package test uninstall unittest install reinstall $(MAKEDIRS) +.PHONY: help clean doc doctest exampletest package test uninstall unittest unittest_mpi install reinstall $(MAKEDIRS) help: - @echo Targets are: clean, doc, doctest, exampletest, package, uninstall, unittest, test + @echo Targets are: clean, doc, doctest, exampletest, package, uninstall, unittest, unittest_mpi , test clean: find . -name "*.pyc" -type f -delete @@ -15,23 +15,34 @@ clean: find . -name ".#*" -delete find . -name "#*#" -delete -test: unittest exampletest doctest +$(MAKEDIRS): + make -C $@ -unittest: - python3 -m unittest discover -s tests -v -p "*_tests.py" || (echo "Error in unit tests."; exit 1) +# testing +test: unittest unittest_mpi exampletest exampletest_mpi doctest -$(MAKEDIRS): - make -C $@ +unittest: + echo "Running standard unit tests.." + python3 -m unittest discover -s tests -v -p "*_tests.py" || (echo "Error in standard unit tests."; exit 1) -doctest: - make -C doc html || (echo "Error in documentation generator."; exit 1) +unittest_mpi: + echo "Running MPI backend unit tests.." + mpirun -np 2 python3 -m unittest discover -s tests -v -p "backend_tests_mpi.py" || (echo "Error in MPI unit tests."; exit 1) exampletest: $(MAKEDIRS) + echo "Testing standard examples.." python3 -m unittest discover -s examples -v -p "*.py" || (echo "Error in example tests."; exit 1) +exampletest_mpi: + echo "Testing MPI backend examples.." + mpirun -np 2 python3 -m unittest -v examples/backends/mpi/pmcabc_gaussian.py || (echo "Error in MPI example tests."; exit 1) + +doctest: + make -C doc html || (echo "Error in documentation generator."; exit 1) + coveragetest: - command -v coverage >/dev/null 2>&1 || { echo >&2 "Python package 'coverage' has to be installed. Please, run 'pip3 install coverage'."; exit;} + command -v coverage >/dev/null 2>&1 || { echo >&2 "Python package 'coverage' has to been installed. Please, run 'pip3 install coverage'."; exit;} @- $(foreach TEST, $(UNITTESTS), \ echo === Testing code coverage: $(TEST); \ python3 -m unittest $(TEST); \ diff --git a/VERSION b/VERSION index ee1372d3..be586341 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.2 +0.3 diff --git a/abcpy/backends/__init__.py b/abcpy/backends/__init__.py new file mode 100644 index 00000000..93a9b88b --- /dev/null +++ b/abcpy/backends/__init__.py @@ -0,0 +1,14 @@ +from abcpy.backends.base import * + + +def BackendMPI(*args,**kwargs): + from abcpy.backends.mpi import BackendMPI + return BackendMPI(*args,**kwargs) + +def BackendMPITestHelper(*args,**kwargs): + from abcpy.backends.mpi import BackendMPITestHelper + return BackendMPITestHelper(*args,**kwargs) + +def BackendSpark(*args,**kwargs): + from abcpy.backends.spark import BackendSpark + return BackendSpark(*args,**kwargs) diff --git a/abcpy/backends.py b/abcpy/backends/base.py similarity index 61% rename from abcpy/backends.py rename to abcpy/backends/base.py index 461d60ed..c525e34b 100644 --- a/abcpy/backends.py +++ b/abcpy/backends/base.py @@ -226,148 +226,3 @@ def __init__(self, object): def value(self): return self.object - - - -class BackendSpark(Backend): - """ - A parallelization backend for Apache Spark. It is essetially a wrapper for - the required Spark functionality. - """ - - def __init__(self, sparkContext, parallelism=4): - """ - Initialize the backend with an existing and configured SparkContext. - - Parameters - ---------- - sparkContext: pyspark.SparkContext - an existing and fully configured PySpark context - parallelism: int - defines on how many workers a distributed dataset can be distributed - """ - self.sc = sparkContext - self.parallelism = parallelism - - - def parallelize(self, python_list): - """ - This is a wrapper of pyspark.SparkContext.parallelize(). - - Parameters - ---------- - list: Python list - list that is distributed on the workers - - Returns - ------- - PDSSpark class (parallel data set) - A reference object that represents the parallelized list - """ - - rdd = self.sc.parallelize(python_list, self.parallelism) - pds = PDSSpark(rdd) - return pds - - - def broadcast(self, object): - """ - This is a wrapper for pyspark.SparkContext.broadcast(). - - Parameters - ---------- - object: Python object - An abitrary object that should be available on all workers - Returns - ------- - BDSSpark class (broadcast data set) - A reference to the broadcasted object - """ - - bcv = self.sc.broadcast(object) - bds = BDSSpark(bcv) - return bds - - - def map(self, func, pds): - """ - This is a wrapper for pyspark.rdd.map() - - Parameters - ---------- - func: Python func - A function that can be applied to every element of the pds - pds: PDSSpark class - A parallel data set to which func should be applied - Returns - ------- - PDSSpark class - a new parallel data set that contains the result of the map - """ - - rdd = pds.rdd.map(func) - new_pds = PDSSpark(rdd) - return new_pds - - - def collect(self, pds): - """ - A wrapper for pyspark.rdd.collect() - - Parameters - ---------- - pds: PDSSpark class - a parallel data set - Returns - ------- - Python list - all elements of pds as a list - """ - - python_list = pds.rdd.collect() - return python_list - - - -class PDSSpark(PDS): - """ - This is a wrapper for Apache Spark RDDs. - """ - - def __init__(self, rdd): - """ - Returns - ------- - rdd: pyspark.rdd - initialize with an Spark RDD - """ - - self.rdd = rdd - - - -class BDSSpark(BDS): - """ - This is a wrapper for Apache Spark Broadcast variables. - """ - - def __init__(self, bcv): - """ - Parameters - ---------- - bcv: pyspark.broadcast.Broadcast - Initialize with a Spark broadcast variable - """ - - self.bcv = bcv - - - def value(self): - """ - Returns - ------- - object - returns the referenced object that was broadcasted. - """ - - return self.bcv.value diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py new file mode 100644 index 00000000..52d04acc --- /dev/null +++ b/abcpy/backends/mpi.py @@ -0,0 +1,602 @@ +import numpy as np +import cloudpickle + +from mpi4py import MPI +from abcpy.backends import Backend, PDS, BDS + +class BackendMPIMaster(Backend): + """Defines the behavior of the master process + + This class defines the behavior of the master process (The one + with rank==0) in MPI. + + """ + + #Define some operation codes to make it more readable + OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] + finalized = False + + def __init__(self, master_node_ranks=[0]): + + self.comm = MPI.COMM_WORLD + self.size = self.comm.Get_size() + self.rank = self.comm.Get_rank() + + self.master_node_ranks = master_node_ranks + + #Initialize the current_pds_id and bds_id + self.__current_pds_id = 0 + self.__current_bds_id = 0 + + #Initialize a BDS store for both master & slave. + self.bds_store = {} + + + def __command_slaves(self, command, data): + """ + This method handles the sending of the command to the slaves + telling them what operation to perform next. + + Parameters + ---------- + command: operation code of OP_xxx + One of the operation codes defined in the class definition as OP_xxx + which tell the slaves what operation they're performing. + data: tuple + Any of the data required for the operation which needs to be bundled + in the data packet sent. + """ + + if command == self.OP_PARALLELIZE: + #In parallelize we receive data as (pds_id) + data_packet = (command, data[0]) + + elif command == self.OP_MAP: + #In map we receive data as (pds_id,pds_id_new,func) + #Use cloudpickle to dump the function into a string. + function_packed = self.__sanitize_and_pack_func(data[2]) + data_packet = (command, data[0], data[1], function_packed) + + elif command == self.OP_BROADCAST: + data_packet = (command, data[0]) + + elif command == self.OP_COLLECT: + #In collect we receive data as (pds_id) + data_packet = (command, data[0]) + + elif command == self.OP_DELETEPDS or command == self.OP_DELETEBDS: + #In deletepds we receive data as (pds_id) or bds_id + data_packet = (command, data[0]) + + elif command == self.OP_FINISH: + data_packet = (command,) + + _ = self.comm.bcast(data_packet, root=0) + + + def __sanitize_and_pack_func(self, func): + """ + Prevents the function from packing the backend by temporarily + setting it to another variable and then uses cloudpickle + to pack it into a string to be sent. + + Parameters + ---------- + func: Python Function + The function we are supposed to pack while sending it along to the slaves + during the map function + + Returns + ------- + Returns a string of the function packed by cloudpickle + + """ + + #Set the backend to None to prevent it from being packed + globals()['backend'] = {} + + function_packed = cloudpickle.dumps(func) + + #Reset the backend to self after it's been packed + globals()['backend'] = self + + return function_packed + + + def __generate_new_pds_id(self): + """ + This method generates a new pds_id to associate a PDS with it's remote counterpart + that slaves use to store & index data based on the pds_id they receive + + Returns + ------- + Returns a unique integer id. + + """ + + self.__current_pds_id += 1 + return self.__current_pds_id + + + def __generate_new_bds_id(self): + """ + This method generates a new bds_id to associate a BDS with it's remote counterpart + that slaves use to store & index data based on the bds_id they receive + + Returns + ------- + Returns a unique integer id. + + """ + + self.__current_bds_id += 1 + return self.__current_bds_id + + + def parallelize(self, python_list): + """ + This method distributes the list on the available workers and returns a + reference object. + + The list is split into number of workers many parts as a numpy array. + Each part is sent to a separate worker node using the MPI scatter. + + MASTER: python_list is the real data that is to be split up + + Parameters + ---------- + list: Python list + the list that should get distributed on the worker nodes + + Returns + ------- + PDSMPI class (parallel data set) + A reference object that represents the parallelized list + """ + + # Tell the slaves to enter parallelize() + pds_id = self.__generate_new_pds_id() + self.__command_slaves(self.OP_PARALLELIZE, (pds_id,)) + + #Initialize empty data lists for the processes on the master node + rdd_masters = [[] for i in range(len(self.master_node_ranks))] + + #Split the data only amongst the number of workers + rdd_slaves = np.array_split(python_list, self.size - len(self.master_node_ranks), axis=0) + + #Combine the lists into the final rdd before we split it across all ranks. + rdd = rdd_masters + rdd_slaves + + data_chunk = self.comm.scatter(rdd, root=0) + + pds = PDSMPI(data_chunk, pds_id, self) + + return pds + + + def map(self, func, pds): + """ + A distributed implementation of map that works on parallel data sets (PDS). + + On every element of pds the function func is called. + + Parameters + ---------- + func: Python func + A function that can be applied to every element of the pds + pds: PDS class + A parallel data set to which func should be applied + + Returns + ------- + PDSMPI class + a new parallel data set that contains the result of the map + """ + + # Tell the slaves to enter the map() with the current pds_id & func. + #Get pds_id of dataset we want to operate on + pds_id = pds.pds_id + + #Generate a new pds_id to be used by the slaves for the resultant PDS + pds_id_new = self.__generate_new_pds_id() + + data = (pds_id, pds_id_new, func) + self.__command_slaves(self.OP_MAP, data) + + rdd = list(map(func, pds.python_list)) + + pds_res = PDSMPI(rdd, pds_id_new, self) + + return pds_res + + + def collect(self, pds): + """ + Gather the pds from all the workers, + send it to the master and return it as a standard Python list. + + Parameters + ---------- + pds: PDS class + a parallel data set + + Returns + ------- + Python list + all elements of pds as a list + """ + + # Tell the slaves to enter collect with the pds's pds_id + self.__command_slaves(self.OP_COLLECT, (pds.pds_id,)) + + python_list = self.comm.gather(pds.python_list, root=0) + + + # When we gather, the results are a list of lists one + # .. per rank. Undo that by one level and still maintain multi + # .. dimensional output (which is why we cannot use np.flatten) + combined_result = [] + list(map(combined_result.extend, python_list)) + return combined_result + + + def broadcast(self, value): + # Tell the slaves to enter broadcast() + bds_id = self.__generate_new_bds_id() + self.__command_slaves(self.OP_BROADCAST, (bds_id,)) + + _ = self.comm.bcast(value, root=0) + + bds = BDSMPI(value, bds_id, self) + return bds + + + def delete_remote_pds(self, pds_id): + """ + A public function for the PDS objects on the master to call when they go out of + scope or are deleted in order to ensure the same happens on the slaves. + + Parameters + ---------- + pds_id: int + A pds_id identifying the remote PDS on the slaves to delete. + """ + + if not self.finalized: + self.__command_slaves(self.OP_DELETEPDS, (pds_id,)) + + + def delete_remote_bds(self, bds_id): + """ + Public function for the BDS objects on the master to call when they go + out of score or are deleted in order to ensure they are deleted + ont he slaves as well. + + Parameters + ---------- + bds_id: int + A bds_id identifying the remote BDS on the slaves to delete. + """ + + if not self.finalized: + #The master deallocates it's BDS data. Explicit because + #.. bds_store and BDSMPI object are disconnected. + del backend.bds_store[bds_id] + self.__command_slaves(self.OP_DELETEBDS, (bds_id,)) + + + def __del__(self): + """ + Overriding the delete function to explicitly call MPI.finalize(). + This is also required so we can tell the slaves to get out of the + while loop they are in and exit gracefully and they themselves call + finalize when they die. + """ + + #Tell the slaves they can exit gracefully. + self.__command_slaves(self.OP_FINISH, None) + + #Finalize the connection because the slaves should have finished. + MPI.Finalize() + self.finalized = True + + +class BackendMPISlave(Backend): + """Defines the behavior of the slaves processes + + This class defines how the slaves should behave during operation. + Slaves are those processes(not nodes like Spark) that have rank!=0 + and whose ids are not present in the list of non workers. + """ + + OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] + + + def __init__(self): + self.comm = MPI.COMM_WORLD + self.size = self.comm.Get_size() + self.rank = self.comm.Get_rank() + + #Define the vars that will hold the pds ids received from master to operate on + self.__rec_pds_id = None + self.__rec_pds_id_result = None + + #Initialize a BDS store for both master & slave. + self.bds_store = {} + + #Go into an infinite loop waiting for commands from the user. + self.slave_run() + + + def slave_run(self): + """ + This method is the infinite loop a slave enters directly from init. + It makes the slave wait for a command to perform from the master and + then calls the appropriate function. + + This method also takes care of the synchronization of data between the + master and the slaves by matching PDSs based on the pds_ids sent by the master + with the command. + + Commands received from the master are of the form of a tuple. + The first component of the tuple is always the operation to be performed + and the rest are conditional on the operation. + + (op,pds_id) where op == OP_PARALLELIZE for parallelize + (op,pds_id, pds_id_result,func) where op == OP_MAP for map. + (op,pds_id) where op == OP_COLLECT for a collect operation + (op,pds_id) where op == OP_DELETEPDS for a delete of the remote PDS on slaves + (op,) where op==OP_FINISH for the slave to break out of the loop and terminate + """ + + # Initialize PDS data store here because only slaves need to do it. + self.pds_store = {} + + while True: + data = self.comm.bcast(None, root=0) + + op = data[0] + if op == self.OP_PARALLELIZE: + pds_id = data[1] + self.__rec_pds_id = pds_id + pds = self.parallelize([]) + self.pds_store[pds.pds_id] = pds + + + elif op == self.OP_MAP: + pds_id, pds_id_result, function_packed = data[1:] + self.__rec_pds_id, self.__rec_pds_id_result = pds_id, pds_id_result + + #Use cloudpickle to convert back function string to a function + func = cloudpickle.loads(function_packed) + #Set the function's backend to current class + #so it can access bds_store properly + # func.backend = self + + + # Access an existing PDS + pds = self.pds_store[pds_id] + pds_res = self.map(func, pds) + + # Store the result in a newly gnerated PDS pds_id + self.pds_store[pds_res.pds_id] = pds_res + + elif op == self.OP_BROADCAST: + self.__bds_id = data[1] + self.broadcast(None) + + elif op == self.OP_COLLECT: + pds_id = data[1] + + # Access an existing PDS from data store + pds = self.pds_store[pds_id] + + self.collect(pds) + + elif op == self.OP_DELETEPDS: + pds_id = data[1] + del self.pds_store[pds_id] + + elif op == self.OP_DELETEBDS: + bds_id = data[1] + del self.bds_store[bds_id] + + elif op == self.OP_FINISH: + quit() + else: + raise Exception("Slave recieved unknown command code") + + + def __get_received_pds_id(self): + """ + Function to retrieve the pds_id(s) we received from the master to associate + our slave's created PDS with the master's. + """ + + return self.__rec_pds_id, self.__rec_pds_id_result + + + def parallelize(self, python_list): + """ + This method distributes the list on the available workers and returns a + reference object. + + The list is split into number of workers many parts as a numpy array. + Each part is sent to a separate worker node using the MPI scatter. + + SLAVE: python_list should be [] and is ignored by the scatter() + + Parameters + ---------- + list: Python list + the list that should get distributed on the worker nodes + + Returns + ------- + PDSMPI class (parallel data set) + A reference object that represents the parallelized list + """ + + #Get the PDS id we should store this data in + pds_id, pds_id_new = self.__get_received_pds_id() + + data_chunk = self.comm.scatter(None, root=0) + + pds = PDSMPI(data_chunk, pds_id, self) + + return pds + + + def map(self, func, pds): + """ + A distributed implementation of map that works on parallel data sets (PDS). + + On every element of pds the function func is called. + + Parameters + ---------- + func: Python func + A function that can be applied to every element of the pds + pds: PDS class + A parallel data set to which func should be applied + + Returns + ------- + PDSMPI class + a new parallel data set that contains the result of the map + """ + + #Get the PDS id we operate on and the new one to store the result in + pds_id, pds_id_new = self.__get_received_pds_id() + + rdd = list(map(func, pds.python_list)) + + pds_res = PDSMPI(rdd, pds_id_new, self) + + return pds_res + + + def collect(self, pds): + """ + Gather the pds from all the workers, + send it to the master and return it as a standard Python list. + + Parameters + ---------- + pds: PDS class + a parallel data set + + Returns + ------- + Python list + all elements of pds as a list + """ + + #Send the data we have back to the master + _ = self.comm.gather(pds.python_list, root=0) + + + def broadcast(self, value): + """ + Value is ignored for the slaves. We get data from master + """ + value = self.comm.bcast(None, root=0) + self.bds_store[self.__bds_id] = value + + +class BackendMPI(BackendMPIMaster if MPI.COMM_WORLD.Get_rank() == 0 else BackendMPISlave): + """A backend parallelized by using MPI + + The backend conditionally inherits either the BackendMPIMaster class + or the BackendMPISlave class depending on it's rank. This lets + BackendMPI have a uniform interface for the user but allows for a + logical split between functions performed by the master + and the slaves. + """ + + def __init__(self, master_node_ranks=[0]): + self.comm = MPI.COMM_WORLD + self.size = self.comm.Get_size() + self.rank = self.comm.Get_rank() + + if self.size < 2: + raise ValueError('A minimum of 2 ranks are required for the MPI backend') + + + #Set the global backend + globals()['backend'] = self + + + #Call the appropriate constructors and pass the required data + if self.rank == 0: + super().__init__(master_node_ranks) + else: + super().__init__() + raise Exception("Slaves exitted main loop.") + + +class PDSMPI(PDS): + """ + This is an MPI wrapper for a Python parallel data set. + """ + + def __init__(self, python_list, pds_id, backend_obj): + self.python_list = python_list + self.pds_id = pds_id + self.backend_obj = backend_obj + + def __del__(self): + """ + Destructor to be called when a PDS falls out of scope and/or is being deleted. + Uses the backend to send a message to destroy the slaves' copy of the pds. + """ + try: + self.backend_obj.delete_remote_pds(self.pds_id) + except AttributeError: + #Catch "delete_remote_pds not defined" for slaves and ignore. + pass + + +class BDSMPI(BDS): + """ + This is a wrapper for MPI's BDS class. + """ + + def __init__(self, object, bds_id, backend_obj): + #The BDS data is no longer saved in the BDS object. + #It will access & store the data only from the current backend + self.bds_id = bds_id + backend.bds_store[self.bds_id] = object + # self.backend_obj = backend_obj + + def value(self): + """ + This method returns the actual object that the broadcast data set represents. + """ + return backend.bds_store[self.bds_id] + + def __del__(self): + """ + Destructor to be called when a BDS falls out of scope and/or is being deleted. + Uses the backend to send a message to destroy the slaves' copy of the bds. + """ + + try: + backend.delete_remote_bds(self.bds_id) + except AttributeError: + #Catch "delete_remote_pds not defined" for slaves and ignore. + pass + +class BackendMPITestHelper: + """ + Helper function for some of the test cases to be able to access and verify class members. + """ + def check_pds(self, k): + """Checks if a PDS exists in the pds data store. Used to verify deletion and creation + """ + return k in backend.pds_store.keys() + + def check_bds(self, k): + """Checks if a BDS exists in the BDS data store. Used to verify deletion and creation + """ + return k in backend.bds_store.keys() diff --git a/abcpy/backends/spark.py b/abcpy/backends/spark.py new file mode 100644 index 00000000..33d960a9 --- /dev/null +++ b/abcpy/backends/spark.py @@ -0,0 +1,145 @@ + +from abcpy.backends import Backend, PDS, BDS + +class BackendSpark(Backend): + """ + A parallelization backend for Apache Spark. It is essetially a wrapper for + the required Spark functionality. + """ + + def __init__(self, sparkContext, parallelism=4): + """ + Initialize the backend with an existing and configured SparkContext. + + Parameters + ---------- + sparkContext: pyspark.SparkContext + an existing and fully configured PySpark context + parallelism: int + defines on how many workers a distributed dataset can be distributed + """ + self.sc = sparkContext + self.parallelism = parallelism + + + def parallelize(self, python_list): + """ + This is a wrapper of pyspark.SparkContext.parallelize(). + + Parameters + ---------- + list: Python list + list that is distributed on the workers + + Returns + ------- + PDSSpark class (parallel data set) + A reference object that represents the parallelized list + """ + + rdd = self.sc.parallelize(python_list, self.parallelism) + pds = PDSSpark(rdd) + return pds + + + def broadcast(self, object): + """ + This is a wrapper for pyspark.SparkContext.broadcast(). + + Parameters + ---------- + object: Python object + An abitrary object that should be available on all workers + Returns + ------- + BDSSpark class (broadcast data set) + A reference to the broadcasted object + """ + + bcv = self.sc.broadcast(object) + bds = BDSSpark(bcv) + return bds + + + def map(self, func, pds): + """ + This is a wrapper for pyspark.rdd.map() + + Parameters + ---------- + func: Python func + A function that can be applied to every element of the pds + pds: PDSSpark class + A parallel data set to which func should be applied + Returns + ------- + PDSSpark class + a new parallel data set that contains the result of the map + """ + + rdd = pds.rdd.map(func) + new_pds = PDSSpark(rdd) + return new_pds + + + def collect(self, pds): + """ + A wrapper for pyspark.rdd.collect() + + Parameters + ---------- + pds: PDSSpark class + a parallel data set + Returns + ------- + Python list + all elements of pds as a list + """ + + python_list = pds.rdd.collect() + return python_list + + + +class PDSSpark(PDS): + """ + This is a wrapper for Apache Spark RDDs. + """ + + def __init__(self, rdd): + """ + Returns + ------- + rdd: pyspark.rdd + initialize with an Spark RDD + """ + + self.rdd = rdd + + + +class BDSSpark(BDS): + """ + This is a wrapper for Apache Spark Broadcast variables. + """ + + def __init__(self, bcv): + """ + Parameters + ---------- + bcv: pyspark.broadcast.Broadcast + Initialize with a Spark broadcast variable + """ + + self.bcv = bcv + + + def value(self): + """ + Returns + ------- + object + returns the referenced object that was broadcasted. + """ + + return self.bcv.value diff --git a/doc/source/DEVELOP.rst b/doc/source/DEVELOP.rst new file mode 100644 index 00000000..bcdccfe3 --- /dev/null +++ b/doc/source/DEVELOP.rst @@ -0,0 +1,26 @@ +Branching Scheme +================ + +We use the branching strategy described in this `blog post `_. + + +Deploy a new Release +==================== + +This documentation is mainly intended for the main developers. The deployment of +new releases is automated using Travis CI. However, there are still a few manual +steps required in order to deploy a new release. Assume we want to deploy the +new version `M.m.b': + +1. Create a release branch `release-M.m.b` +2. Adapt `VERSION` file in the repos root directiory `echo M.m.b > VERSION` +3. Merge all desired feature branches into the release branch +4. Create a pull/ merge request: release branch -> master + +After a successfull merge: + +5. Create tag vM.m.b (`git tag vM.m.b`) and push the tag (`git push --tags`) +6. Create a release in Github + +The new tag on master will signal Travis to deploy a new package to Pypi while +the GitHub release is just for user documentation. diff --git a/doc/source/README.rst b/doc/source/README.rst index ff233b3e..860d067a 100644 --- a/doc/source/README.rst +++ b/doc/source/README.rst @@ -142,10 +142,18 @@ And certainly, a journal can easily be saved to and loaded from disk: :language: python :lines: 60, 63 :dedent: 4 - - + + +Using Parallelization Backends +============================== + +Running ABC algorithms is often computationally expensive, thus ABCpy is build +with parallelization in mind. In order to run your inference schemes in parallel +on multiple nodes (computers) you can choose from the following backends. + + Using the Spark Backend -======================= +~~~~~~~~~~~~~~~~~~~~~~~ To run ABCpy in parallel using Apache Spark, one only needs to use the provided Spark backend. Considering the example from above, the statements for the @@ -153,7 +161,7 @@ backend have to be changed to .. literalinclude:: ../../examples/backends/apache_spark/pmcabc_gaussian.py :language: python - :lines: 29-32 + :lines: 6-9 :dedent: 4 In words, a Spark context has to be created and passed to the Spark @@ -166,20 +174,130 @@ The standard way to run the script on Spark is via the spark-submit command: :: - PYSPARK_PYTHON=python3 spark-submit gaussian.py + PYSPARK_PYTHON=python3 spark-submit pmcabc_gaussian.py Often Spark installations use Python 2 by default. To make Spark use the required Python 3 interpreter, the `PYSPARK_PYTHON` environment variable can be set. The adapted python code can be found in -`examples/backend/apache_spark/gaussian.py`. - +`examples/backend/apache_spark/pmcabc_gaussian.py`. + Note that in order to run jobs in parallel you need to have Apache Spark -installed on the system in question. Details on the installation can be found on -the official `homepage `_. Further, keep in mind that -the ABCpy library has to be properly installed on the cluster, such that it is -available to the Python interpreters on the master and the worker nodes. +installed on the system in question. The dependencies of the MPI backend can be +install with `pip install -r requirements/backend-spark.txt`. + +Details on the installation can be found on the official `homepage +`_. Further, keep in mind that the ABCpy library has to +be properly installed on the cluster, such that it is available to the Python +interpreters on the master and the worker nodes. + +Using the MPI Backend +~~~~~~~~~~~~~~~~~~~~~ + +To run ABCpy in parallel using MPI, one only needs to use the provided MPI +backend. Using the same example as above, the statements for the backend have to +be changed to + +.. literalinclude:: ../../examples/backends/mpi/pmcabc_gaussian.py + :language: python + :lines: 6-7 + :dedent: 4 + +In words, one only needs to initialize an instance of the MPI backend. The +number of ranks to spawn are specified at runtime through the way the script is +run. A minimum of two ranks is required, since rank 0 (master) is used to +orchestrade the calculation and all other ranks (workers) actually perform the +calculation. + +The standard way to run the script using Open MPI is directly via mpirun like below +or on a cluster through a job scheduler like Slurm: + +:: + + mpirun -np 4 python3 pmcabc_gaussian.py + + +The adapted Python code can be found in +`examples/backend/mpi/pmcabc_gaussian.py`. + +Note that in order to run jobs in parallel you need to have MPI installed on the +system(s) in question with the requisite Python bindings for MPI (mpi4py). The +dependencies of the MPI backend can be install with +`pip install -r requirements/backend-mpi.txt`. + +Details on the installation can be found on the official `Open MPI homepage +`_ and the `mpi4py homepage +`_. Further, keep in mind that the ABCpy library has +to be properly installed on the cluster, such that it is available to the Python +interpreters on the master and the worker nodes. + +Using Cluster Infrastructure +============================ + +When your model is computationally expensive and/or other factors require +compute infrastructure that goes beyond a single notebook or workstation you can +easily run ABCpy on infrastructure for cluster or high-performance computing. + +Running on Amazon Web Services +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +We show with high level steps how to get ABCpy running on Amazon Web Services +(AWS). Please note, that this is not a complete guide to AWS, so we would like +to refer you to the respective documentation. The first step would be to setup a +AWS Elastic Map Reduce (EMR) cluster which comes with the option of a +pre-configured Apache Spark. Then, we show how to run a simple inference code on +this cluster. + +Setting up the EMR Cluster +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When we setup an EMR cluster we want to install ABCpy on every node of the +cluster. Therefore, we provide a bootstrap script that does this job for us. On +your local machine create a file named `emr_bootstrap.sh` with the following +content: + +:: + + #!/bin/sh + sudo yum -y install git + sudo pip-3.4 install ipython findspark abcpy + +In AWS go to Services, then S3 under the Storage Section. Create a new bucket +called `abcpy` and upload your bootstrap script `emr_bootstap.sh`. + +To create a cluster, in AWS go to Services and then EMR under the Analytics +Section. Click 'Create Cluster', then choose 'Advanced Options'. In Step 1 +choose the emr-5.7.0 image and make sure only Spark is selected for your cluster +(the other software packages are not required). In Step 2 choose for example one +master node and 4 core nodes (16 vCPUs if you have 4 vCPUs instances). In Step 3 +under the boostrap action, choose custom, and select the script +`abcpy/emr_bootstrap.sh`. In the last step (Step 4), choose a key to access the +master node (we assume that you already setup keys). Start the cluster. + + +Running ABCpy on AWS +~~~~~~~~~~~~~~~~~~~~ + +Log in via SSH and run the following commands to get an example code from ABCpy +running with Python3 support: + +:: + + sudo bash -c 'echo export PYSPARK_PYTHON=python34 >> /etc/spark/conf/spark-env.sh' + git clone https://github.com/eth-cscs/abcpy.git + +Then, to submit a job to the Spark cluster we run the following commands: + +:: + + cd abcpy/examples/backends/ + spark-submit --num-executors 16 pmcabc_gaussian.py + +Clearly the setup can be extended and optimized. For this and basic information +we refer you to the `AWS documentation on +EMR `_. + Implementing a new Model @@ -247,6 +365,121 @@ same way (see `Getting Started`_) as we would do with shipped models. The complete example code can be found `here `_ + +Wrap a Model Written in C++ +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +There are several frameworks that help you integrating your C++/C code into +Python. We showcase examples for + +* `Swig `_ +* `Pybind `_ + +Using Swig +~~~~~~~~~~ + +Swig is a tool that creates a Python wrapper for our C++/C code using an +interface (file) that we have to specify. We can then import the wrapper and +in turn use your C++ code with ABCpy as if it was written in Python. + +We go through a complete example to illustrate how to use a simple Gaussian +model written in C++ with ABCpy. First, have a look at our C++ model: + +.. literalinclude:: ../../examples/extensions/models/gaussian_cpp/gaussian_model_simple.cpp + :language: c++ + :lines: 9 - 17 + +To use this code in Python, we need to specify exactly how to expose the C++ +function to Python. Therefore, we write a Swig interface file that look as +follows: + +.. literalinclude:: ../../examples/extensions/models/gaussian_cpp/gaussian_model_simple.i + :language: c++ + +In the first line we define the module name we later have to import in your +ABCpy Python code. Then, in curly brackets, we specify which libraries we want +to include and which function we want to expose through the wrapper. + +Now comes the tricky part. The model class expects a method `simulate` that +forward-simulates our model and which returns an array of syntetic +observations. However, C++/C does not know the concept of returning an array, +instead in C++/C we would provide a memory position (pointer) where to write +the results. Swig has to translate between the two concepts. We use actually an +Swig interface definition from numpy called `import_array`. The line + +.. literalinclude:: ../../examples/extensions/models/gaussian_cpp/gaussian_model_simple.i + :language: c++ + :lines: 18 + +states that we want the two parameters `result` and `k` of the `gaussian_model` +C++ function be interpreted as an array of length k that is returned. Have a +look at the Python code below and observe how the wrapped Python function takes only two +instead of four parameters and returns a numpy array. + +The first stop to get everything running is to translate the Swig interface file +to wrapper code in C++ and Python. +:: + + swig -python -c++ -o gaussian_model_simple_wrap.cpp gaussian_model_simple.i + +This creates two wrapper files `gaussian_model_simple_wrap.cpp` and +`gaussian_model_simple.py`. Now the C++ files can be compiled: +:: + + g++ -fPIC -I /usr/include/python3.5m -c gaussian_model_simple.cpp -o gaussian_model_simple.o + g++ -fPIC -I /usr/include/python3.5m -c gaussian_model_simple_wrap.cpp -o gaussian_model_simple_wrap.o + g++ -shared gaussian_model_simple.o gaussian_model_simple_wrap.o -o _gaussian_model_simple.so + +Note that the include paths might need to be adapted to your system. Finally, we +can write a Python model which uses our C++ code: + +.. literalinclude:: ../../examples/extensions/models/gaussian_cpp/pmcabc-gaussian_model_simple.py + :language: python + :lines: 3 - 32 + +The important lines are where we import the wrapper code as a module (line 2) and call +the respective model function (line -2). + +The full code is available in `examples/extensions/models/gaussion_cpp/`. To +simplify compilation of SWIG and C++ code we created a Makefile. Note that you +might need to adapt some paths in the Makefile. + +Wrap a Model Written in R +------------------------- + +Statisticians often use the R language to build statistical models. R models can +be incorporated within the ABCpy language with the `rpy2` Python package. We +show how to use the `rpy2` package to connect with a model written in R. + +Continuing from the previous sections we use a simple Gaussian model as an +example. The following R code is the contents of the R file `gaussian_model.R`: + +.. literalinclude:: ../../examples/extensions/models/gaussian_R/gaussian_model.R + :language: R + :lines: 1 - 4 + +More complex R models are incorporated in the same way. To include this function +within ABCpy we include the following code at the beginning of our Python file: + +.. literalinclude:: ../../examples/extensions/models/gaussian_R/gaussian_model.py + :language: python + :lines: 5 - 14 + +This imports the R function `simple_gaussian` into the Python environment. We +need to build our own model to incorporate this R function as in the previous +section. The only difference is the `simulate` method of the class `Gaussian'. + +.. automethod:: abcpy.models.Model.simulate + :noindex: + +.. literalinclude:: ../../examples/extensions/models/gaussian_R/gaussian_model.py + :language: python + :lines: 40 - 42 + +The default output for R functions in Python is a float vector. This must be +converted into a Python list for the purposes of ABCpy. + + .. Extending: Add your Distance ============================ diff --git a/doc/source/index.rst b/doc/source/index.rst index d234c5cb..a80317e1 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -17,6 +17,12 @@ Welcome to ABCpy's documentation! README +.. toctree:: + :maxdepth: 2 + :caption: Developer Documentation + + DEVELOP + .. toctree:: :maxdepth: 2 :caption: Reference diff --git a/examples/backends/apache_spark/pmcabc_gaussian.py b/examples/backends/apache_spark/pmcabc_gaussian.py index 819791f7..50995e9c 100644 --- a/examples/backends/apache_spark/pmcabc_gaussian.py +++ b/examples/backends/apache_spark/pmcabc_gaussian.py @@ -1,5 +1,14 @@ import numpy as np +def setup_backend(): + global backend + + import pyspark + sc = pyspark.SparkContext() + from abcpy.backends import BackendSpark as Backend + backend = Backend(sc, parallelism=4) + + def infer_parameters(): # define observation for true parameters mean=170, std=15 y_obs = [160.82499176, 167.24266737, 185.71695756, 153.7045709, 163.40568812, 140.70658699, 169.59102084, 172.81041696, 187.38782738, 179.66358934, 176.63417241, 189.16082803, 181.98288443, 170.18565017, 183.78493886, 166.58387299, 161.9521899, 155.69213073, 156.17867343, 144.51580379, 170.29847515, 197.96767899, 153.36646527, 162.22710198, 158.70012047, 178.53470703, 170.77697743, 164.31392633, 165.88595994, 177.38083686, 146.67058471763457, 179.41946565658628, 238.02751620619537, 206.22458790620766, 220.89530574344568, 221.04082532837026, 142.25301427453394, 261.37656571434275, 171.63761180867033, 210.28121820385866, 237.29130237612236, 175.75558340169619, 224.54340549862235, 197.42448680731226, 165.88273684581381, 166.55094082844519, 229.54308602661584, 222.99844054358519, 185.30223966014586, 152.69149367593846, 206.94372818527413, 256.35498655339154, 165.43140916577741, 250.19273595481803, 148.87781549665536, 223.05547559193792, 230.03418198709608, 146.13611923127021, 138.24716809523139, 179.26755740864527, 141.21704876815426, 170.89587081800852, 222.96391329259626, 188.27229523693822, 202.67075179617672, 211.75963110985992, 217.45423324370509] @@ -25,11 +34,6 @@ def infer_parameters(): mean, cov, df = np.array([.0, .0]), np.eye(2), 3. kernel = MultiStudentT(mean, cov, df, seed=1) - # define backend - import pyspark - sc = pyspark.SparkContext() - from abcpy.backends import BackendSpark as Backend - backend = Backend(sc, parallelism=4) # define sampling scheme from abcpy.inferences import PMCABC @@ -71,6 +75,7 @@ def setUp(self): findspark.init() def test_example(self): + setup_backend() journal = infer_parameters() test_result = journal.posterior_mean()[0] expected_result = 176.0 @@ -78,6 +83,7 @@ def test_example(self): if __name__ == "__main__": + setup_backend() journal = infer_parameters() analyse_journal(journal) diff --git a/examples/backends/mpi/pmcabc_gaussian.py b/examples/backends/mpi/pmcabc_gaussian.py new file mode 100644 index 00000000..8e142a75 --- /dev/null +++ b/examples/backends/mpi/pmcabc_gaussian.py @@ -0,0 +1,100 @@ +import numpy as np + +def setup_backend(): + global backend + + from abcpy.backends import BackendMPI as Backend + backend = Backend() + + +def infer_parameters(): + # define observation for true parameters mean=170, std=15 + y_obs = [160.82499176, 167.24266737, 185.71695756, 153.7045709, 163.40568812, 140.70658699, 169.59102084, 172.81041696, 187.38782738, 179.66358934, 176.63417241, 189.16082803, 181.98288443, 170.18565017, 183.78493886, 166.58387299, 161.9521899, 155.69213073, 156.17867343, 144.51580379, 170.29847515, 197.96767899, 153.36646527, 162.22710198, 158.70012047, 178.53470703, 170.77697743, 164.31392633, 165.88595994, 177.38083686, 146.67058471763457, 179.41946565658628, 238.02751620619537, 206.22458790620766, 220.89530574344568, 221.04082532837026, 142.25301427453394, 261.37656571434275, 171.63761180867033, 210.28121820385866, 237.29130237612236, 175.75558340169619, 224.54340549862235, 197.42448680731226, 165.88273684581381, 166.55094082844519, 229.54308602661584, 222.99844054358519, 185.30223966014586, 152.69149367593846, 206.94372818527413, 256.35498655339154, 165.43140916577741, 250.19273595481803, 148.87781549665536, 223.05547559193792, 230.03418198709608, 146.13611923127021, 138.24716809523139, 179.26755740864527, 141.21704876815426, 170.89587081800852, 222.96391329259626, 188.27229523693822, 202.67075179617672, 211.75963110985992, 217.45423324370509] + + # define prior + from abcpy.distributions import Uniform + prior = Uniform([150, 5],[200, 25], seed=1) + + # define the model + from abcpy.models import Gaussian + model = Gaussian(prior, seed=1) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import LogReg + distance_calculator = LogReg(statistics_calculator) + + # define kernel + from abcpy.distributions import MultiStudentT + mean, cov, df = np.array([.0, .0]), np.eye(2), 3. + kernel = MultiStudentT(mean, cov, df, seed=1) + + + # define sampling scheme + from abcpy.inferences import PMCABC + sampler = PMCABC(model, distance_calculator, kernel, backend, seed=1) + + # sample from scheme + T, n_sample, n_samples_per_param = 3, 250, 10 + eps_arr = np.array([.75]) + epsilon_percentile = 10 + journal = sampler.sample(y_obs, T, eps_arr, n_sample, n_samples_per_param, epsilon_percentile) + + return journal + + +def analyse_journal(journal): + # output parameters and weights + print(journal.parameters) + print(journal.weights) + + # do post analysis + print(journal.posterior_mean()) + print(journal.posterior_cov()) + print(journal.posterior_histogram()) + + # print configuration + print(journal.configuration) + + # save and load journal + journal.save("experiments.jnl") + + from abcpy.output import Journal + new_journal = Journal.fromFile('experiments.jnl') + + +import unittest +from mpi4py import MPI + +def setUpModule(): + ''' + If an exception is raised in a setUpModule then none of + the tests in the module will be run. + + This is useful because the slaves run in a while loop on initialization + only responding to the master's commands and will never execute anything else. + + On termination of master, the slaves call quit() that raises a SystemExit(). + Because of the behaviour of setUpModule, it will not run any unit tests + for the slave and we now only need to write unit-tests from the master's + point of view. + ''' + setup_backend() + +class ExampleGaussianMPITest(unittest.TestCase): + def test_example(self): + journal = infer_parameters() + test_result = journal.posterior_mean()[0] + expected_result = 176.0 + self.assertLess(abs(test_result - expected_result), 2.) + + +if __name__ == "__main__": + setup_backend() + journal = infer_parameters() + analyse_journal(journal) + + diff --git a/examples/extensions/models/gaussian_R/gaussian_model.R b/examples/extensions/models/gaussian_R/gaussian_model.R new file mode 100644 index 00000000..7c67d69a --- /dev/null +++ b/examples/extensions/models/gaussian_R/gaussian_model.R @@ -0,0 +1,4 @@ +simple_gaussian <- function(mu, sigma, k = 1){ + output <- rnorm(k, mu, sigma) + return(output) +} \ No newline at end of file diff --git a/examples/extensions/models/gaussian_R/gaussian_model.py b/examples/extensions/models/gaussian_R/gaussian_model.py new file mode 100644 index 00000000..1c89c4c8 --- /dev/null +++ b/examples/extensions/models/gaussian_R/gaussian_model.py @@ -0,0 +1,126 @@ +import numpy as np + +from abcpy.models import Model + + +import rpy2.robjects as robjects +import rpy2.robjects.numpy2ri +rpy2.robjects.numpy2ri.activate() + +robjects.r(''' + source('gaussian_model.R') +''') + +r_simple_gaussian = robjects.globalenv['simple_gaussian'] + + +class Gaussian(Model): + def __init__(self, prior, seed=None): + self.prior = prior + self.sample_from_prior() + self.rng = np.random.RandomState(seed) + + def set_parameters(self, theta): + theta = np.array(theta) + + if theta.shape[0] > 2: return False + if theta[1] <= 0: return False + + self.mu = theta[0] + self.sigma = theta[1] + return True + + def get_parameters(self): + return np.array([self.mu, self.sigma]) + + def sample_from_prior(self): + sample = self.prior.sample(1).reshape(-1) + self.set_parameters(sample) + + def simulate(self, k): + output = list(r_simple_gaussian(self.mu, self.sigma, k)) + return output + + +def infer_parameters(): + # define observation for true parameters mean=170, std=15 + y_obs = [160.82499176, 167.24266737, 185.71695756, 153.7045709, 163.40568812, 140.70658699, 169.59102084, 172.81041696, 187.38782738, 179.66358934, 176.63417241, 189.16082803, 181.98288443, 170.18565017, 183.78493886, 166.58387299, 161.9521899, 155.69213073, 156.17867343, 144.51580379, 170.29847515, 197.96767899, 153.36646527, 162.22710198, 158.70012047, 178.53470703, 170.77697743, 164.31392633, 165.88595994, 177.38083686, 146.67058471763457, 179.41946565658628, 238.02751620619537, 206.22458790620766, 220.89530574344568, 221.04082532837026, 142.25301427453394, 261.37656571434275, 171.63761180867033, 210.28121820385866, 237.29130237612236, 175.75558340169619, 224.54340549862235, 197.42448680731226, 165.88273684581381, 166.55094082844519, 229.54308602661584, 222.99844054358519, 185.30223966014586, 152.69149367593846, 206.94372818527413, 256.35498655339154, 165.43140916577741, 250.19273595481803, 148.87781549665536, 223.05547559193792, 230.03418198709608, 146.13611923127021, 138.24716809523139, 179.26755740864527, 141.21704876815426, 170.89587081800852, 222.96391329259626, 188.27229523693822, 202.67075179617672, 211.75963110985992, 217.45423324370509] + + # define prior + from abcpy.distributions import Uniform + prior = Uniform([150, 5],[200, 25]) + + # define the model + model = Gaussian(prior) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import LogReg + distance_calculator = LogReg(statistics_calculator) + + # define kernel + from abcpy.distributions import MultiStudentT + mean, cov, df = np.array([.0, .0]), np.eye(2), 3. + kernel = MultiStudentT(mean, cov, df) + + # define backend + from abcpy.backends import BackendDummy as Backend + backend = Backend() + + # define sampling scheme + from abcpy.inferences import PMCABC + sampler = PMCABC(model, distance_calculator, kernel, backend) + + # sample from scheme + T, n_sample, n_samples_per_param = 3, 250, 10 + eps_arr = np.array([.75]) + epsilon_percentile = 10 + journal = sampler.sample(y_obs, T, eps_arr, n_sample, n_samples_per_param, epsilon_percentile) + + return journal + + +def analyse_journal(journal): + # output parameters and weights + print(journal.parameters) + print(journal.weights) + + # do post analysis + print(journal.posterior_mean()) + print(journal.posterior_cov()) + print(journal.posterior_histogram()) + + # print configuration + print(journal.configuration) + + # save and load journal + journal.save("experiments.jnl") + + from abcpy.output import Journal + new_journal = Journal.fromFile('experiments.jnl') + + +journal = infer_parameters() +mu = journal.get_parameters()[:,0].reshape(-1,1) +sigma = journal.get_parameters()[:,1].reshape(-1,1) + +import graph_ABC +plot_mu = graph_ABC.plot(mu, true_value = 170) +plot_sigma = graph_ABC.plot(sigma, true_value = 15) + +# this code is for testing purposes only and not relevant to run the example +import unittest +class ExampleExtendModelGaussianPython(unittest.TestCase): + def test_example(self): + journal = infer_parameters() + test_result = journal.posterior_mean()[0] + expected_result = 177.02 + self.assertLess(abs(test_result - expected_result), 2.) + + +if __name__ == "__main__": + journal = infer_parameters() + analyse_journal(journal) diff --git a/examples/extensions/models/gaussian_R/graph_ABC.py b/examples/extensions/models/gaussian_R/graph_ABC.py new file mode 100644 index 00000000..122c6bbf --- /dev/null +++ b/examples/extensions/models/gaussian_R/graph_ABC.py @@ -0,0 +1,31 @@ + +import matplotlib.pyplot as plt +from scipy.stats import gaussian_kde +import numpy as np + +def plot(samples, path = None, true_value = 5, title = 'ABC posterior'): + Bayes_estimate = np.mean(samples, axis = 0) + theta = true_value + xmin, xmax = max(samples[:,0]), min(samples[:,0]) + positions = np.linspace(xmin, xmax, samples.shape[0]) + gaussian_kernel = gaussian_kde(samples[:,0].reshape(samples.shape[0],)) + values = gaussian_kernel(positions) + plt.figure() + plt.plot(positions,gaussian_kernel(positions)) + plt.plot([theta, theta],[min(values), max(values)+.1*(max(values)-min(values))]) + plt.plot([Bayes_estimate, Bayes_estimate],[min(values), max(values)+.1*(max(values)-min(values))]) + plt.ylim([min(values), max(values)+.1*(max(values)-min(values))]) + plt.xlabel(r'$\theta$') + plt.ylabel('density') + #plt.xlim([0,1]) + plt.rc('axes', labelsize=15) + plt.legend(loc='best', frameon=False, numpoints=1) + font = {'size' : 15} + plt.rc('font', **font) + plt.title(title) + if path is not None : + plt.savefig(path) + return plt + + + diff --git a/examples/extensions/models/gaussian_cpp/gaussian_model_simple.cpp b/examples/extensions/models/gaussian_cpp/gaussian_model_simple.cpp index a77c3ab6..46b3aff0 100644 --- a/examples/extensions/models/gaussian_cpp/gaussian_model_simple.cpp +++ b/examples/extensions/models/gaussian_cpp/gaussian_model_simple.cpp @@ -4,8 +4,10 @@ using namespace std; + // Simulation function of the gaussian model -void gaussian_model(double* result, unsigned int k, double mu, double sigma, boost::mt19937 rng) { +void gaussian_model(double* result, unsigned int k, double mu, double sigma, int seed) { + boost::mt19937 rng(seed); boost::normal_distribution<> nd(mu, sigma); boost::variate_generator > sampler(rng, nd); @@ -15,13 +17,11 @@ void gaussian_model(double* result, unsigned int k, double mu, double sigma, boo } - // main function to run the simulation of the Gaussian model int main() { int k = 10; - boost::mt19937 rng; double samples[k]; - gaussian_model(samples, 0.0, 1.0, k, rng); + gaussian_model(samples, 0.0, 1.0, k, 1); for (int i=0; i #include - extern void gaussian_model(double* result, unsigned int k, double mu, double sigma, boost::mt19937 rng); + extern void gaussian_model(double* result, unsigned int k, double mu, double sigma, int seed); %} %include "numpy.i" @@ -15,14 +15,7 @@ import_array(); %} -%inline %{ - boost::mt19937* get_rng(int seed) { - boost::mt19937* rng = new boost::mt19937(seed); - return rng; - } -%} - %apply (double* ARGOUT_ARRAY1, int DIM1 ) {(double* result, unsigned int k)}; -extern void gaussian_model(double* result, unsigned int k, double mu, double sigma, boost::mt19937 rng); +extern void gaussian_model(double* result, unsigned int k, double mu, double sigma, int seed); diff --git a/examples/extensions/models/gaussian_cpp/pmcabc-gaussian_model_simple.py b/examples/extensions/models/gaussian_cpp/pmcabc-gaussian_model_simple.py index 9f2150a2..22a01a43 100644 --- a/examples/extensions/models/gaussian_cpp/pmcabc-gaussian_model_simple.py +++ b/examples/extensions/models/gaussian_cpp/pmcabc-gaussian_model_simple.py @@ -1,7 +1,8 @@ import numpy as np from abcpy.models import Model -from gaussian_model import gaussian_model, get_rng +from gaussian_model_simple import gaussian_model + class Gaussian(Model): def __init__(self, prior, seed=None): self.prior = prior @@ -26,9 +27,8 @@ def sample_from_prior(self): self.set_parameters(sample) def simulate(self, k): - cseed = self.rng.randint(np.iinfo(np.int32).max) - crng = get_rng(cseed); - result = gaussian_model(k, self.mu, self.sigma, crng) + seed = self.rng.randint(np.iinfo(np.int32).max) + result = gaussian_model(k, self.mu, self.sigma, seed) return list(result) diff --git a/examples/extensions/models/gaussian_f90/Makefile b/examples/extensions/models/gaussian_f90/Makefile new file mode 100644 index 00000000..763e8097 --- /dev/null +++ b/examples/extensions/models/gaussian_f90/Makefile @@ -0,0 +1,8 @@ +F2PY=f2py3 +EXT_SUFFIX := $(shell python3-config --extension-suffix) + +default: gaussian_model_simple$(EXT_SUFFIX) + +%$(EXT_SUFFIX): %.f90 + $(F2PY) -c -m $* $< + diff --git a/examples/extensions/models/gaussian_f90/gaussian_model_simple.f90 b/examples/extensions/models/gaussian_f90/gaussian_model_simple.f90 new file mode 100644 index 00000000..b9054dca --- /dev/null +++ b/examples/extensions/models/gaussian_f90/gaussian_model_simple.f90 @@ -0,0 +1,49 @@ +module gaussian_model +contains + subroutine gaussian(output, mu, sigma, k, seed) + integer, intent(in) :: k, seed + real(8), intent(in) :: mu, sigma + real(8), intent(out) :: output(k) + + integer :: i, n + real(8) :: r, theta + real(8), dimension(:), allocatable :: temp + integer(4), dimension(:), allocatable :: seed_arr + + ! get random seed array size and fill seed_arr with provided seed + call random_seed(size = n) + allocate(seed_arr(n)) + seed_arr = seed + call random_seed(put = seed_arr) + + ! create 2k random numbers uniformly from [0,1] + if(allocated(temp)) then + deallocate(temp) + end if + allocate(temp(k*2)) + call random_number(temp) + + ! Use Box-Muller transfrom to create normally distributed variables + do i = 1, k + r = (-2.0 * log(temp(2*i-1)))**0.5 + theta = 2 * 3.1415926 * temp(2*i) + output(i) = mu + sigma * r * sin(theta) + end do + end subroutine gaussian +end module gaussian_model + +program main + use gaussian_model + implicit none + + integer, parameter :: k = 100 + integer :: seed = 9, i + real(8) :: mu = 10.0, sigma = 2.0 + real(8) :: output(k) + + call gaussian(output, mu, sigma, k, seed) + + do i = 1, k + write(*,*) output(i) + end do +end program main diff --git a/examples/extensions/models/gaussian_f90/pmcabc-gaussian_model_simple.py b/examples/extensions/models/gaussian_f90/pmcabc-gaussian_model_simple.py new file mode 100644 index 00000000..0a2527fe --- /dev/null +++ b/examples/extensions/models/gaussian_f90/pmcabc-gaussian_model_simple.py @@ -0,0 +1,110 @@ +import numpy as np + +from abcpy.models import Model +from gaussian_model_simple import gaussian_model + +class Gaussian(Model): + def __init__(self, prior, seed=None): + self.prior = prior + self.sample_from_prior() + self.rng = np.random.RandomState(seed) + + + def set_parameters(self, theta): + theta = np.array(theta) + if theta.shape[0] > 2: return False + if theta[1] <= 0: return False + + self.mu = theta[0] + self.sigma = theta[1] + return True + + def get_parameters(self): + return np.array([self.mu, self.sigma]) + + def sample_from_prior(self): + sample = self.prior.sample(1).reshape(-1) + self.set_parameters(sample) + + def simulate(self, k): + seed = self.rng.randint(np.iinfo(np.int32).max) + result = gaussian_model(self.mu, self.sigma, k, seed) + return list(result) + + +def infer_parameters(): + # define observation for true parameters mean=170, std=15 + y_obs = [160.82499176, 167.24266737, 185.71695756, 153.7045709, 163.40568812, 140.70658699, 169.59102084, 172.81041696, 187.38782738, 179.66358934, 176.63417241, 189.16082803, 181.98288443, 170.18565017, 183.78493886, 166.58387299, 161.9521899, 155.69213073, 156.17867343, 144.51580379, 170.29847515, 197.96767899, 153.36646527, 162.22710198, 158.70012047, 178.53470703, 170.77697743, 164.31392633, 165.88595994, 177.38083686, 146.67058471763457, 179.41946565658628, 238.02751620619537, 206.22458790620766, 220.89530574344568, 221.04082532837026, 142.25301427453394, 261.37656571434275, 171.63761180867033, 210.28121820385866, 237.29130237612236, 175.75558340169619, 224.54340549862235, 197.42448680731226, 165.88273684581381, 166.55094082844519, 229.54308602661584, 222.99844054358519, 185.30223966014586, 152.69149367593846, 206.94372818527413, 256.35498655339154, 165.43140916577741, 250.19273595481803, 148.87781549665536, 223.05547559193792, 230.03418198709608, 146.13611923127021, 138.24716809523139, 179.26755740864527, 141.21704876815426, 170.89587081800852, 222.96391329259626, 188.27229523693822, 202.67075179617672, 211.75963110985992, 217.45423324370509] + + # define prior + from abcpy.distributions import Uniform + prior = Uniform([150, 5],[200, 25]) + + # define the model + model = Gaussian(prior) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import LogReg + distance_calculator = LogReg(statistics_calculator) + + # define kernel + from abcpy.distributions import MultiStudentT + mean, cov, df = np.array([.0, .0]), np.eye(2), 3. + kernel = MultiStudentT(mean, cov, df) + + # define backend + from abcpy.backends import BackendSpark as Backend + from abcpy.backends import BackendDummy as Backend + backend = Backend() + + # define sampling scheme + from abcpy.inferences import PMCABC + sampler = PMCABC(model, distance_calculator, kernel, backend) + + # sample from scheme + T, n_sample, n_samples_per_param = 3, 100, 10 + eps_arr = np.array([.75]) + epsilon_percentile = 10 + journal = sampler.sample(y_obs, T, eps_arr, n_sample, n_samples_per_param, epsilon_percentile) + + return journal + + +def analyse_journal(journal): + # output parameters and weights + print(journal.parameters) + print(journal.weights) + + # do post analysis + print(journal.posterior_mean()) + print(journal.posterior_cov()) + print(journal.posterior_histogram()) + + # print configuration + print(journal.configuration) + + # save and load journal + journal.save("experiments.jnl") + + from abcpy.output import Journal + new_journal = Journal.fromFile('experiments.jnl') + + +# this code is for testing purposes only and not relevant to run the example +import unittest +class ExampleExtendModelGaussianCpp(unittest.TestCase): + def test_example(self): + journal = infer_parameters() + test_result = journal.posterior_mean()[0] + expected_result = 177.02 + self.assertLess(abs(test_result - expected_result), 1.) + + +if __name__ == "__main__": + journal = infer_parameters() + analyse_journal(journal) + diff --git a/requirements.txt b/requirements.txt index 04e7d4d8..007b5307 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ numpy scipy sklearn glmnet -findspark sphinx==1.4.8 sphinx_rtd_theme coverage diff --git a/requirements/backend-mpi.txt b/requirements/backend-mpi.txt new file mode 100644 index 00000000..47e460fa --- /dev/null +++ b/requirements/backend-mpi.txt @@ -0,0 +1,2 @@ +mpi4py +cloudpickle diff --git a/requirements/backend-spark.txt b/requirements/backend-spark.txt new file mode 100644 index 00000000..2e186911 --- /dev/null +++ b/requirements/backend-spark.txt @@ -0,0 +1 @@ +findspark diff --git a/setup.py b/setup.py index 533b2e33..138d3680 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ version=version, description='A framework for approximate Bayesian computation (ABC) that speeds up inference by parallelizing computation on single computers or whole clusters.', - long_description='ABCpy is a highly modular, scientific library for approximate Bayesian computation (ABC) written in Python using the parallel computation framework Apache SPARK. The modularity helps domain scientists to easily apply ABC to their research without being ABC experts; using ABCpy they can easily run large parallel simulations without much knowledge about parallelization, even without much additional effort to parallelize their code. Further, ABCpy enables ABC experts to easily develop new inference schemes and evaluate them in a standardized environment, and to extend the library with new algorithms. These benefits come mainly from the modularity of ABCpy.', + long_description='ABCpy is a highly modular, scientific library for approximate Bayesian computation (ABC) written in Python. It is designed to run all included ABC algorithms in parallel, either using multiple cores of a single computer or using an Apache Spark or MPI enabled cluster. The modularity helps domain scientists to easily apply ABC to their research without being ABC experts; using ABCpy they can easily run large parallel simulations without much knowledge about parallelization, even without much additional effort to parallelize their code. Further, ABCpy enables ABC experts to easily develop new inference schemes and evaluate them in a standardized environment, and to extend the library with new algorithms. These benefits come mainly from the modularity of ABCpy.', # The project's main homepage. url='https://github.com/eth-cscs/abcpy', diff --git a/tests/backend_tests_mpi.py b/tests/backend_tests_mpi.py new file mode 100644 index 00000000..18377f38 --- /dev/null +++ b/tests/backend_tests_mpi.py @@ -0,0 +1,138 @@ +import unittest +from mpi4py import MPI +from abcpy.backends import BackendMPI,BackendMPITestHelper + + +def setUpModule(): + ''' + If an exception is raised in a setUpModule then none of + the tests in the module will be run. + + This is useful because the slaves run in a while loop on initialization + only responding to the master's commands and will never execute anything else. + + On termination of master, the slaves call quit() that raises a SystemExit(). + Because of the behaviour of setUpModule, it will not run any unit tests + for the slave and we now only need to write unit-tests from the master's + point of view. + ''' + global rank,backend_mpi + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + backend_mpi = BackendMPI() + +class MPIBackendTests(unittest.TestCase): + + def test_parallelize(self): + data = [0]*backend_mpi.size + pds = backend_mpi.parallelize(data) + pds_map = backend_mpi.map(lambda x: x + MPI.COMM_WORLD.Get_rank(), pds) + res = backend_mpi.collect(pds_map) + + for master_index in backend_mpi.master_node_ranks: + self.assertTrue(master_index not in res,"Node in master_node_ranks performed map.") + + def test_map(self): + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + pds_map = backend_mpi.map(lambda x:x**2,pds) + res = backend_mpi.collect(pds_map) + assert res==list(map(lambda x:x**2,data)) + + + def test_broadcast(self): + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + + bds = backend_mpi.broadcast(100) + + #Pollute the BDS values of the master to confirm slaves + # use their broadcasted value + for k,v in backend_mpi.bds_store.items(): + backend_mpi.bds_store[k] = 99999 + + def test_map(x): + return x + bds.value() + + pds_m = backend_mpi.map(test_map, pds) + self.assertTrue(backend_mpi.collect(pds_m)==[101,102,103,104,105]) + + def test_pds_delete(self): + + def check_if_exists(x): + obj = BackendMPITestHelper() + return obj.check_pds(x) + + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + + #Check if the pds we just created exists in all the slaves(+master) + + id_check_pds = backend_mpi.parallelize([pds.pds_id]*5) + pds_check_result = backend_mpi.map(check_if_exists, id_check_pds) + self.assertTrue(False not in backend_mpi.collect(pds_check_result),"PDS was not created") + + #Delete the PDS on master and try again + del pds + pds_check_result = backend_mpi.map(check_if_exists,id_check_pds) + + self.assertTrue(True not in backend_mpi.collect(pds_check_result),"PDS was not deleted") + + + def test_bds_delete(self): + + def check_if_exists(x): + obj = BackendMPITestHelper() + return obj.check_bds(x) + + data = [1,2,3,4,5] + bds = backend_mpi.broadcast(data) + + #Check if the pds we just created exists in all the slaves(+master) + id_check_bds = backend_mpi.parallelize([bds.bds_id]*5) + bds_check_result = backend_mpi.map(check_if_exists, id_check_bds) + self.assertTrue(False not in backend_mpi.collect(bds_check_result),"BDS was not created") + + #Delete the PDS on master and try again + del bds + bds_check_result = backend_mpi.map(check_if_exists,id_check_bds) + self.assertTrue(True not in backend_mpi.collect(bds_check_result),"BDS was not deleted") + + + def test_function_pickle(self): + def square(x): + return x**2 + + class staticfunctest: + @staticmethod + def square(x): + return x**2 + + class nonstaticfunctest: + def square(self,x): + return x**2 + + data = [1,2,3,4,5] + expected_result = [1,4,9,16,25] + pds = backend_mpi.parallelize(data) + + + pds_map1 = backend_mpi.map(square,pds) + pds_res1 = backend_mpi.collect(pds_map1) + self.assertTrue(pds_res1==expected_result,"Failed pickle test for general function") + + + pds_map2 = backend_mpi.map(lambda x:x**2,pds) + pds_res2 = backend_mpi.collect(pds_map2) + self.assertTrue(pds_res2==expected_result,"Failed pickle test for lambda function") + + + pds_map3 = backend_mpi.map(staticfunctest.square,pds) + pds_res3 = backend_mpi.collect(pds_map3) + self.assertTrue(pds_res3==expected_result,"Failed pickle test for static function") + + + obj = nonstaticfunctest() + pds_map4 = backend_mpi.map(obj.square ,pds) + pds_res4 = backend_mpi.collect(pds_map4) + self.assertTrue(pds_res4==expected_result,"Failed pickle test for non-static function")