diff --git a/Makefile b/Makefile index e973beb0..710ddf5a 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,7 @@ unittest: unittest_mpi: echo "Running MPI backend unit tests.." mpirun -np 2 python3 -m unittest discover -s tests -v -p "backend_tests_mpi.py" || (echo "Error in MPI unit tests."; exit 1) + mpirun -np 3 python3 -m unittest discover -s tests -v -p "backend_tests_mpi_model_mpi.py" || (echo "Error in MPI unit tests."; exit 1) exampletest: $(MAKEDIRS) echo "Testing standard examples.." diff --git a/README.md b/README.md index df39d5a6..5dbe9df9 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,10 @@ scientists by providing # Documentation For more information, check out the -* [Documentation](http://abcpy.readthedocs.io/en/v0.5.4) -* [Examples](https://github.com/eth-cscs/abcpy/tree/v0.5.4/examples) directory and -* [Reference](http://abcpy.readthedocs.io/en/v0.5.4/abcpy.html) +* [Documentation](http://abcpy.readthedocs.io/en/v0.5.5) +* [Examples](https://github.com/eth-cscs/abcpy/tree/v0.5.5/examples) directory and +* [Reference](http://abcpy.readthedocs.io/en/v0.5.5/abcpy.html) + Further, we provide a [collection of models](https://github.com/eth-cscs/abcpy-models) for which ABCpy @@ -54,7 +55,8 @@ finally CSCS (Swiss National Super Computing Center) for their generous support. There is a paper in the proceedings of the 2017 PASC conference. In case you use ABCpy for your publication, we would appreciate a citation. You can use -[this](https://github.com/eth-cscs/abcpy/blob/v0.5.4/doc/literature/DuttaS-ABCpy-PASC-2017.bib) +[this](https://github.com/eth-cscs/abcpy/blob/v0.5.5/doc/literature/DuttaS-ABCpy-PASC-2017.bib) + BibTex reference. @@ -62,23 +64,22 @@ BibTex reference. Publications in which ABCpy was applied: -* R. Dutta, M. Schoengens, A. Ummadisingu, J. P. Onnela, A. Mira, "ABCpy: A - High-Performance Computing Perspective to Approximate Bayesian Computation", - 2017, arXiv:1711.04694 - * R. Dutta, J. P. Onnela, A. Mira, "Bayesian Inference of Spreading Processes - on Networks", 2017, arXiv:1709.08862 + on Networks", 2018, Proc. R. Soc. A, 474(2215), 20180129. + +* R. Dutta, Z. Faidon Brotzakis and A. Mira, "Bayesian Calibration of + Force-fields from Experimental Data: TIP4P Water", 2018, Journal of Chemical Physics 149, 154110. * R. Dutta, B. Chopard, J. Lätt, F. Dubois, K. Zouaoui Boudjeltia and A. Mira, "Parameter Estimation of Platelets Deposition: Approximate Bayesian - Computation with High Performance Computing", 2017, arXiv:1710.01054 + Computation with High Performance Computing", 2018, Frontiers in physiology, 9. * A. Ebert, R. Dutta, P. Wu, K. Mengersen and A. Mira, "Likelihood-Free Parameter Estimation for Dynamic Queueing Networks", 2018, arXiv:1804.02526 -* R. Dutta, Z. Faidon Brotzakis and A. Mira, "Bayesian Calibration of - Force-fields from Experimental Data: TIP4P Water", 2018, arXiv:1804.02742 - +* R. Dutta, M. Schoengens, A. Ummadisingu, N. Widerman, J. P. Onnela, A. Mira, "ABCpy: A + High-Performance Computing Perspective to Approximate Bayesian Computation", + 2017, arXiv:1711.04694 ## License ABCpy is published under the BSD 3-clause license, see [here](LICENSE). diff --git a/VERSION b/VERSION index 7d856835..d1d899fa 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.4 +0.5.5 diff --git a/abcpy/approx_lhd.py b/abcpy/approx_lhd.py index e63d6100..a14a54df 100644 --- a/abcpy/approx_lhd.py +++ b/abcpy/approx_lhd.py @@ -69,6 +69,8 @@ def __init__(self, statistics_calc): def likelihood(self, y_obs, y_sim): # print("DEBUG: SynLiklihood.likelihood().") if not isinstance(y_obs, list): + # print("type(y_obs) : ", type(y_obs), " , type(y_sim) : ", type(y_sim)) + # print("y_obs : ", y_obs) raise TypeError('Observed data is not of allowed types') if not isinstance(y_sim, list): @@ -81,14 +83,11 @@ def likelihood(self, y_obs, y_sim): # Extract summary statistics from the simulated data stat_sim = self.statistics_calc.statistics(y_sim) - + # Compute the mean, robust precision matrix and determinant of precision matrix - # print("DEBUG: meansim computation.") mean_sim = np.mean(stat_sim,0) - # print("DEBUG: robust_precision_sim computation.") lw_cov_, _ = ledoit_wolf(stat_sim) robust_precision_sim = np.linalg.inv(lw_cov_) - # print("DEBUG: robust_precision_sim_det computation..") robust_precision_sim_det = np.linalg.det(robust_precision_sim) # print("DEBUG: combining.") tmp1 = robust_precision_sim * np.array(self.stat_obs.reshape(-1,1) - mean_sim.reshape(-1,1)).T diff --git a/abcpy/backends/__init__.py b/abcpy/backends/__init__.py index 93a9b88b..936dfbd0 100644 --- a/abcpy/backends/__init__.py +++ b/abcpy/backends/__init__.py @@ -2,13 +2,25 @@ def BackendMPI(*args,**kwargs): + # import and setup module mpimanager + import abcpy.backends.mpimanager + master_node_ranks = [0] + process_per_model = 1 + if 'master_node_ranks' in kwargs: + master_node_ranks = kwargs['master_node_ranks'] + if 'process_per_model' in kwargs: + process_per_model = kwargs['process_per_model'] + abcpy.backends.mpimanager.create_mpi_manager(master_node_ranks, process_per_model) + + # import BackendMPI and return and instance from abcpy.backends.mpi import BackendMPI return BackendMPI(*args,**kwargs) + def BackendMPITestHelper(*args,**kwargs): from abcpy.backends.mpi import BackendMPITestHelper return BackendMPITestHelper(*args,**kwargs) def BackendSpark(*args,**kwargs): from abcpy.backends.spark import BackendSpark - return BackendSpark(*args,**kwargs) + return BackendSpark(*args,**kwargs) \ No newline at end of file diff --git a/abcpy/backends/base.py b/abcpy/backends/base.py index 13e6d760..b74bdcd4 100644 --- a/abcpy/backends/base.py +++ b/abcpy/backends/base.py @@ -226,3 +226,12 @@ def __init__(self, object): def value(self): return self.object + +class NestedParallelizationController(): + @abstractmethod + def nested_execution(self): + raise NotImplementedError + + @abstractmethod + def run_nested(self, func, *args, **kwargs): + raise NotImplementedError diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py index c792979c..3816becb 100644 --- a/abcpy/backends/mpi.py +++ b/abcpy/backends/mpi.py @@ -1,17 +1,100 @@ +# noinspection PyInterpreter +import cloudpickle +import numpy as np import pickle import time +import logging -import cloudpickle -import numpy as np from mpi4py import MPI -from abcpy.backends import BDS, PDS, Backend +from abcpy.backends import BDS, PDS, Backend, NestedParallelizationController + + +import abcpy.backends.mpimanager +from mpi4py import MPI -class BackendMPIMaster(Backend): - """Defines the behavior of the master process +class NestedParallelizationControllerMPI(NestedParallelizationController): + def __init__(self, mpi_comm): + self.logger = logging.getLogger(__name__) + self.logger.info("#### Initialize NPC ####") + self.loop_workers = True + self.mpi_comm = mpi_comm + self.nested_func = "NoFunction" + self.func_args = () + self.func_kwargs = {} + self.result = None + if self.mpi_comm.Get_rank() != 0: + self.nested_execution() + + + def communicator(self): + return self.mpi_comm + + + def nested_execution(self): + rank = self.mpi_comm.Get_rank() + self.logger.debug("Starting nested loop on rank {}".format(rank)) + while self.loop_workers: + self.mpi_comm.barrier() + self.loop_workers = self.mpi_comm.bcast(self.loop_workers, root=0) + if self.loop_workers == False: + return + func_p = None + func_args_p = None + func_kwargs_p = None + if self.mpi_comm.Get_rank() == 0: + self.logger.debug("Start pickling func on rank {}".format(rank)) + func_p = cloudpickle.dumps(self.nested_func, pickle.HIGHEST_PROTOCOL) + func_args_p = cloudpickle.dumps(self.func_args, pickle.HIGHEST_PROTOCOL) + func_kwargs_p = cloudpickle.dumps(self.func_kwargs, pickle.HIGHEST_PROTOCOL) + + self.logger.debug("Broadcasting function {} on rank {}".format(self.nested_func, rank)) + func_p = self.mpi_comm.bcast(func_p, root=0) + func_args_p = self.mpi_comm.bcast(func_args_p, root=0) + func_kwargs_p = self.mpi_comm.bcast(func_kwargs_p, root=0) + self.nested_func = cloudpickle.loads(func_p) + self.func_args = cloudpickle.loads(func_args_p) + self.func_kwargs = cloudpickle.loads(func_kwargs_p) + + func = self.nested_func + self.logger.debug("Starting map function {} on rank {}".format(func.__name__, self.mpi_comm.Get_rank())) + self.func_kwargs['mpi_comm'] = self.mpi_comm + self.mpi_comm.barrier() + self.result = func(*(self.func_args), **(self.func_kwargs)) + self.logger.debug("Ending map function on rank {}".format(self.mpi_comm.Get_rank())) + self.mpi_comm.barrier() + if self.mpi_comm.Get_rank() == 0: + return + self.loop_workers = True + self.logger.debug("Ending nested loop on rank {}".format(self.mpi_comm.Get_rank())) + + def run_nested(self, func, *args, **kwargs): + self.logger.debug("Executing nested function {}.".format(func.__name__)) + self.nested_func = func + self.func_args = args + self.func_kwargs = kwargs + self.nested_execution() + self.logger.debug("Return from nested execution of master rank") + self.nested_func = None + self.func_args = () + self.func_kwargs = {} + self.logger.info(self.result) + return self.result - This class defines the behavior of the master process (The one + def __del__(self): + rank = self.mpi_comm.Get_rank() + self.logger.debug("Stopping npc on rank {}".format(rank)) + self.loop_workers = False + if rank == 0: + self.mpi_comm.barrier() + self.loop_workers = self.mpi_comm.bcast(self.loop_workers, root=0) + self.logger.debug(">>>>>>>> NPC stopped on rank {}".format(rank)) + +class BackendMPIScheduler(Backend): + """Defines the behavior of the scheduler process + + This class defines the behavior of the scheduler process (The one with rank==0) in MPI. """ @@ -20,50 +103,40 @@ class BackendMPIMaster(Backend): OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] finalized = False - def __init__(self, master_node_ranks=[0],chunk_size=1): + def __init__(self, chunk_size=1): """ Parameters ---------- - master_node_ranks: Python list - list of ranks computation should not happen on. - Should include the master so it doesn't get - overwhelmed with work. - chunk_size: Integer size of one block of data to be sent to free - executors + execution teams """ - self.comm = MPI.COMM_WORLD - self.size = self.comm.Get_size() - self.rank = self.comm.Get_rank() - - self.master_node_ranks = master_node_ranks #Initialize the current_pds_id and bds_id self.__current_pds_id = 0 self.__current_bds_id = 0 - #Initialize a BDS store for both master & slave. + #Initialize a BDS store for both scheduler & team. self.bds_store = {} self.pds_store = {} #Initialize a store for the pds data that - #.. hasn't been sent to the workers yet + #.. hasn't been sent to the teams yet self.pds_pending_store = {} self.chunk_size = chunk_size - def __command_slaves(self, command, data): - """Tell slaves to enter relevant execution block - This method handles the sending of the command to the slaves + def __command_teams(self, command, data): + """Tell teams to enter relevant execution block + This method handles the sending of the command to the teams telling them what operation to perform next. Parameters ---------- command: operation code of OP_xxx One of the operation codes defined in the class definition as OP_xxx - which tell the slaves what operation they're performing. + which tell the teams what operation they're performing. data: tuple Any of the data required for the operation which needs to be bundled in the data packet sent. @@ -76,7 +149,6 @@ def __command_slaves(self, command, data): elif command == self.OP_MAP: #In map we receive data as (pds_id,pds_id_new,func) #Use cloudpickle to dump the function into a string. - # function_packed = self.__sanitize_and_pack_func() function_packed = cloudpickle.dumps(data[2],pickle.HIGHEST_PROTOCOL) data_packet = (command, data[0], data[1], function_packed) @@ -94,14 +166,14 @@ def __command_slaves(self, command, data): elif command == self.OP_FINISH: data_packet = (command,) - _ = self.comm.bcast(data_packet, root=0) + _ = self.mpimanager.get_scheduler_communicator().bcast(data_packet, root=0) def __generate_new_pds_id(self): """ This method generates a new pds_id to associate a PDS with it's remote counterpart - that slaves use to store & index data based on the pds_id they receive + that teams use to store & index data based on the pds_id they receive Returns ------- @@ -116,7 +188,7 @@ def __generate_new_pds_id(self): def __generate_new_bds_id(self): """ This method generates a new bds_id to associate a BDS with it's remote counterpart - that slaves use to store & index data based on the bds_id they receive + that teams use to store & index data based on the bds_id they receive Returns ------- @@ -130,18 +202,18 @@ def __generate_new_bds_id(self): def parallelize(self, python_list): """ - This method distributes the list on the available workers and returns a + This method distributes the list on the available teams and returns a reference object. - The list is split into number of workers many parts as a numpy array. - Each part is sent to a separate worker node using the MPI scatter. + The list is split into number of teams many parts as a numpy array. + Each part is sent to a separate team node using the MPI scatter. - MASTER: python_list is the real data that is to be split up + scheduler: python_list is the real data that is to be split up Parameters ---------- list: Python list - the list that should get distributed on the worker nodes + the list that should get distributed on the leader nodes of the teams Returns ------- @@ -149,37 +221,36 @@ def parallelize(self, python_list): A reference object that represents the parallelized list """ - # Tell the slaves to enter parallelize() + # Tell the teams to enter parallelize() pds_id = self.__generate_new_pds_id() - self.__command_slaves(self.OP_PARALLELIZE, (pds_id,)) + self.__command_teams(self.OP_PARALLELIZE, (pds_id,)) #Don't send any data. Just keep it as a queue we're going to pop. self.pds_store[pds_id] = list(python_list) - pds = PDSMPI([], pds_id, self) return pds def orchestrate_map(self,pds_id): - """Orchestrates the slaves/workers to perform a map function + """Orchestrates the teams to perform a map function - This works by keeping track of the workers who haven't finished executing, + This works by keeping track of the teams who haven't finished executing, waiting for them to request the next chunk of data when they are free, responding to them with the data and then sending them a Sentinel signalling that they can exit. """ - is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)] + is_map_done = [True if i in self.mpimanager.get_scheduler_node_ranks() else False for i in range(self.mpimanager.get_scheduler_size())] status = MPI.Status() - #Copy it to the pending. This is so when master accesses + #Copy it to the pending. This is so when scheduler accesses #the PDS data it's not empty. self.pds_pending_store[pds_id] = list(self.pds_store[pds_id]) #While we have some ranks that haven't finished - while sum(is_map_done) 1): + npc = NestedParallelizationControllerMPI(self.mpimanager.get_model_communicator()) + if self.mpimanager.get_model_communicator().Get_rank() == 0: + self.logger.debug("Executing map function on master rank 0.") + res = func(data_item, npc=npc) + del(npc) + else: + res = func(data_item) + except Exception as e: + msg = "Exception occured while calling the map function {}: ".format(func.__name__) + res = type(e)(msg + str(e)) + return res - #Initialize a BDS store for both master & slave. - self.bds_store = {} - #Go into an infinite loop waiting for commands from the user. - self.slave_run() + def __worker_run(self): + """ + Workers enter an infinite loop and waits for instructions from their leader + """ + while True: + data = self.mpimanager.get_model_communicator().bcast(None, root=0) + op = data[0] + if op == self.OP_MAP: + #Receive data from scheduler of the model + function_packed = self.mpimanager.get_model_communicator().bcast(None, root=0)[0] + data_item = self.mpimanager.get_model_communicator().bcast(None, root=0)[0] + self.run_function(function_packed, data_item) + elif op == self.OP_BROADCAST: + self._bds_id = data[1] + self.broadcast(None) + elif op == self.OP_FINISH: + quit() + else: + raise Exception("worker model received unknown command code") + + def collect(self): + pass + + def map(self): + pass + def parallelize(): + pass - def slave_run(self): + def broadcast(self, value): + """ + Receives data from scheduler """ - This method is the infinite loop a slave enters directly from init. - It makes the slave wait for a command to perform from the master and + value = self.mpimanager.get_world_communicator().bcast(None, root=0) + self.bds_store[self._bds_id] = value + + +class BackendMPILeader(BackendMPIWorker): + """Defines the behavior of the leader processes + + This class defines how the leaders should behave during operation. + leaders are those processes(not nodes like Spark) that have rank==0 in the model communicator + """ + + OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] + + + def __init__(self): + """ No parameter, just call leader_run """ + self.logger = logging.getLogger(__name__) + self.__leader_run() + + + + def __leader_run(self): + """ + This method is the infinite loop a leader enters directly from init. + It makes the leader wait for a command to perform from the scheduler and then calls the appropriate function. This method also takes care of the synchronization of data between the - master and the slaves by matching PDSs based on the pds_ids sent by the master + scheduler and the leaders by matching PDSs based on the pds_ids sent by the scheduler with the command. - Commands received from the master are of the form of a tuple. + Commands received from the scheduler are of the form of a tuple. The first component of the tuple is always the operation to be performed and the rest are conditional on the operation. (op,pds_id) where op == OP_PARALLELIZE for parallelize (op,pds_id, pds_id_result,func) where op == OP_MAP for map. (op,pds_id) where op == OP_COLLECT for a collect operation - (op,pds_id) where op == OP_DELETEPDS for a delete of the remote PDS on slaves - (op,) where op==OP_FINISH for the slave to break out of the loop and terminate + (op,pds_id) where op == OP_DELETEPDS for a delete of the remote PDS on leaders + (op,) where op==OP_FINISH for the leader to break out of the loop and terminate """ - # Initialize PDS data store here because only slaves need to do it. + # Initialize PDS data store here because only teams need to do it. self.pds_store = {} while True: - data = self.comm.bcast(None, root=0) + data = self.mpimanager.get_scheduler_communicator().bcast(None, root=0) op = data[0] if op == self.OP_PARALLELIZE: pds_id = data[1] - self.__rec_pds_id = pds_id + self._rec_pds_id = pds_id pds_id, pds_id_new = self.__get_received_pds_id() self.pds_store[pds_id] = None elif op == self.OP_MAP: pds_id, pds_id_result, function_packed = data[1:] - self.__rec_pds_id, self.__rec_pds_id_result = pds_id, pds_id_result - - #Use cloudpickle to convert back function string to a function - func = cloudpickle.loads(function_packed) + self._rec_pds_id, self._rec_pds_id_result = pds_id, pds_id_result #Enter the map so we can grab data and perform the func. #Func sent before and not during for performance reasons - pds_res = self.map(func) + pds_res = self.map(function_packed) # Store the result in a newly gnerated PDS pds_id self.pds_store[pds_res.pds_id] = pds_res elif op == self.OP_BROADCAST: - self.__bds_id = data[1] + self._bds_id = data[1] + #relay command and data into model communicator + self.mpimanager.get_model_communicator().bcast(data, root=0) self.broadcast(None) elif op == self.OP_COLLECT: @@ -442,34 +578,44 @@ def slave_run(self): del self.bds_store[bds_id] elif op == self.OP_FINISH: + # tells other processes of the worker to finish + self.mpimanager.get_model_communicator().bcast([self.OP_FINISH], root=0) quit() else: - raise Exception("Slave recieved unknown command code") + raise Exception("team received unknown command code") def __get_received_pds_id(self): """ - Function to retrieve the pds_id(s) we received from the master to associate - our slave's created PDS with the master's. + Function to retrieve the pds_id(s) we received from the scheduler to associate + our team's created PDS with the scheduler's. """ - return self.__rec_pds_id, self.__rec_pds_id_result + return self._rec_pds_id, self._rec_pds_id_result + def __leader_run_function(self, function_packed, data_item): + """ + This function sends data and serialized function to workers and executes it + """ + self.mpimanager.get_model_communicator().bcast([self.OP_MAP], root=0) + self.mpimanager.get_model_communicator().bcast([function_packed], root=0) + self.mpimanager.get_model_communicator().bcast([data_item], root=0) + return self.run_function(function_packed, data_item) def parallelize(self): pass - def map(self, func): + def map(self, function_packed): """ A distributed implementation of map that works on parallel data sets (PDS). - On every element of pds the function func is called. + We consider that process 0 of each MPI model should return the final result. Parameters ---------- - func: Python func - A function that can be applied to every element of the pds + func: Python function_packed + A serialized function that can be applied to every element of the pds Returns ------- @@ -485,8 +631,8 @@ def map(self, func): rdd = [] while True: #Ask for a chunk of data since it's free - data_chunks = self.comm.sendrecv(pds_id, 0, pds_id) - + data_chunks = self.mpimanager.get_scheduler_communicator().sendrecv(pds_id, 0, pds_id) + #If it receives a sentinel, it's done and it can exit if data_chunks is None: break @@ -494,11 +640,8 @@ def map(self, func): #Accumulate the indicess and *processed* chunks for chunk in data_chunks: data_index,data_item = chunk - try: - result = func(data_item) - except Exception as e: - result = e - rdd.append((data_index, result)) + res = self.__leader_run_function(function_packed, data_item) + rdd+=[(data_index,res)] pds_res = PDSMPI(rdd, pds_id_new, self) @@ -507,8 +650,8 @@ def map(self, func): def collect(self, pds): """ - Gather the pds from all the workers, - send it to the master and return it as a standard Python list. + Gather the pds from all the leaders, + send it to the scheduler and return it as a standard Python list. Parameters ---------- @@ -521,47 +664,86 @@ def collect(self, pds): all elements of pds as a list """ - #Send the data we have back to the master - _ = self.comm.gather(pds.python_list, root=0) + #Send the data we have back to the scheduler + _ = self.mpimanager.get_scheduler_communicator().gather(pds.python_list, root=0) - def broadcast(self, value): - """ - Value is ignored for the slaves. We get data from master - """ - value = self.comm.bcast(None, root=0) - self.bds_store[self.__bds_id] = value +class BackendMPITeam(BackendMPILeader if abcpy.backends.mpimanager.get_mpi_manager().is_leader() else BackendMPIWorker): + """ + A team is compounded by workers and a leader. One process per team is a leader, others are workers + """ + + OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] + + def __init__(self): + #Define the vars that will hold the pds ids received from scheduler to operate on + self._rec_pds_id = None + self._rec_pds_id_result = None + + #Initialize a BDS store for both scheduler & team. + self.bds_store = {} + + #print("In BackendMPITeam, rank : ", self.rank, ", model_rank_global : ", globals()['model_rank_global']) + + self.logger = logging.getLogger(__name__) + super().__init__() -class BackendMPI(BackendMPIMaster if MPI.COMM_WORLD.Get_rank() == 0 else BackendMPISlave): + + +class BackendMPI(BackendMPIScheduler if abcpy.backends.mpimanager.get_mpi_manager().is_scheduler() else BackendMPITeam): """A backend parallelized by using MPI - The backend conditionally inherits either the BackendMPIMaster class - or the BackendMPISlave class depending on it's rank. This lets + The backend conditionally inherits either the BackendMPIScheduler class + or the BackendMPIteam class depending on it's rank. This lets BackendMPI have a uniform interface for the user but allows for a - logical split between functions performed by the master - and the slaves. + logical split between functions performed by the scheduler + and the teams. """ - def __init__(self, master_node_ranks=[0]): - self.comm = MPI.COMM_WORLD - self.size = self.comm.Get_size() - self.rank = self.comm.Get_rank() + def __init__(self, scheduler_node_ranks=[0], process_per_model=1): + """ + Parameters + ---------- + scheduler_node_ranks: Python list + list of scheduler nodes + + process_per_model: Integer + number of MPI processes to allocate to each model + """ + # get mpimanager instance from the mpimanager module (which has to be setup before calling the constructor) + self.logger = logging.getLogger(__name__) + self.mpimanager = abcpy.backends.mpimanager.get_mpi_manager() - if self.size < 2: + if self.mpimanager.get_world_size() < 2: raise ValueError('A minimum of 2 ranks are required for the MPI backend') - #Set the global backend globals()['backend'] = self - #Call the appropriate constructors and pass the required data - if self.rank == 0: - super().__init__(master_node_ranks) - else: - super().__init__() - raise Exception("Slaves exitted main loop.") + super().__init__() + + + def size(self): + """ Returns world size """ + return self.mpimanager.get_world_size() + + def scheduler_node_ranks(self): + """ Returns scheduler node ranks """ + return self.mpimanager.get_scheduler_node_ranks() + + + @staticmethod + def disable_nested(mpi_comm): + if mpi_comm.Get_rank() != 0: + mpi_comm.Barrier() + + + @staticmethod + def enable_nested(mpi_comm): + if mpi_comm.Get_rank() == 0: + mpi_comm.Barrier() @@ -578,12 +760,12 @@ def __init__(self, python_list, pds_id, backend_obj): def __del__(self): """ Destructor to be called when a PDS falls out of scope and/or is being deleted. - Uses the backend to send a message to destroy the slaves' copy of the pds. + Uses the backend to send a message to destroy the teams' copy of the pds. """ try: self.backend_obj.delete_remote_pds(self.pds_id) except AttributeError: - #Catch "delete_remote_pds not defined" for slaves and ignore. + #Catch "delete_remote_pds not defined" for teams and ignore. pass @@ -597,7 +779,6 @@ def __init__(self, object, bds_id, backend_obj): #It will access & store the data only from the current backend self.bds_id = bds_id backend.bds_store[self.bds_id] = object - # self.backend_obj = backend_obj def value(self): """ @@ -608,13 +789,13 @@ def value(self): def __del__(self): """ Destructor to be called when a BDS falls out of scope and/or is being deleted. - Uses the backend to send a message to destroy the slaves' copy of the bds. + Uses the backend to send a message to destroy the teams' copy of the bds. """ try: backend.delete_remote_bds(self.bds_id) except AttributeError: - #Catch "delete_remote_pds not defined" for slaves and ignore. + #Catch "delete_remote_pds not defined" for teams and ignore. pass class BackendMPITestHelper: diff --git a/abcpy/backends/mpimanager.py b/abcpy/backends/mpimanager.py new file mode 100644 index 00000000..17a7b33c --- /dev/null +++ b/abcpy/backends/mpimanager.py @@ -0,0 +1,133 @@ +from mpi4py import MPI +import sys + +__mpimanager = None + +class MPIManager(object): + """Defines the behavior of the slaves/worker processes + + This class construct the MPI communicators structure needed + if the rank of the process is in scheduler_node_ranks, the process is a scheduler + then there is process_per_model process per communicator + """ + + def __init__(self, scheduler_node_ranks=[0], process_per_model=1): + """ + Parameters + ---------- + scheduler_node_ranks: Python list + list of ranks computation should not happen on. + Should include the scheduler so it doesn't get + overwhelmed with work. + + process_per_model: Integer + the number of process to allow to each model + """ + + self._world_communicator = MPI.COMM_WORLD + self._size = self._world_communicator.Get_size() + self._rank = self._world_communicator.Get_rank() + + #Construct the appropriate communicators for resource allocation to models + #There is one communicator for scheduler nodes + #And one communicator per model + self._scheduler_node_ranks = scheduler_node_ranks + self._process_per_model = process_per_model + self._model_color = int(((self._rank - sum(i < self._rank for i in scheduler_node_ranks)) / process_per_model) + 1) + if(self._rank in scheduler_node_ranks): + self._model_color = 0 + self._model_communicator = MPI.COMM_WORLD.Split(self._model_color, self._rank) + self._model_size = self._model_communicator.Get_size() + self._model_rank = self._model_communicator.Get_rank() + + # create a communicator to broadcast instructions to slaves + self._scheduler_color = 1 + if(self._model_color == 0 or self._model_rank == 0): + self._scheduler_color = 0 + self._scheduler_communicator = MPI.COMM_WORLD.Split(self._scheduler_color, self._rank) + self._scheduler_size = self._scheduler_communicator.Get_size() + self._scheduler_rank = self._scheduler_communicator.Get_rank() + + self._leader = False + self._scheduler = False + self._team = False + self._worker = False + + if self._rank == 0: + self._scheduler = True + elif self._model_rank == 0: + self._team = True + self._leader = True + else: + self._team = True + self._worker = True + + + def is_scheduler(self): + ''' Tells if the process is a scheduler ''' + return self._scheduler + + def is_team(self): + ''' Tells if the process is a team ''' + return self._team + + def is_leader(self): + ''' Tells if the process is a leader ''' + return self._leader + + def is_worker(self): + ''' Tells if the process is a worker ''' + return self._worker + + def get_scheduler_node_ranks(self): + ''' Returns the list of scheduler node wanks ''' + return self._scheduler_node_ranks + + def get_world_rank(self): + ''' Returns the current rank ''' + return self._rank + + def get_world_size(self): + ''' Returns the size of the world communicator ''' + return self._size + + def get_world_communicator(self): + ''' Returns the world communicator ''' + return self._world_communicator + + def get_model_rank(self): + ''' Returns the rank in the world communicator ''' + return self._model_rank + + def get_model_size(self): + ''' Returns the size of the model communicator ''' + return self._model_size + + def get_model_communicator(self): + ''' Returns the model communicator ''' + return self._model_communicator + + def get_scheduler_rank(self): + ''' Returns the rank in the scheduler communicator ''' + return self._scheduler_rank + + def get_scheduler_size(self): + ''' Returns the size of the scheduler communicator ''' + return self._scheduler_size + + def get_scheduler_communicator(self): + ''' Returns the scheduler communicator ''' + return self._scheduler_communicator + +def get_mpi_manager(): + ''' Return the instance of mpimanager + Creates one with default parameters is not already existing ''' + global mpimanager + if mpimanager == None : + create_mpi_manager([0], 1) + return mpimanager + +def create_mpi_manager(scheduler_node_ranks, process_per_model): + ''' Creates the instance of mpimanager with given parameters ''' + global mpimanager + mpimanager = MPIManager(scheduler_node_ranks, process_per_model) \ No newline at end of file diff --git a/abcpy/continuousmodels.py b/abcpy/continuousmodels.py index a06be373..bf8ada5d 100644 --- a/abcpy/continuousmodels.py +++ b/abcpy/continuousmodels.py @@ -65,7 +65,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a uniform distribution using the current values for each probabilistic model from which the model derives. @@ -167,7 +167,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a normal distribution using the current values for each probabilistic model from which the model derives. @@ -248,7 +248,7 @@ def __init__(self, parameters, name='StudentT'): super(StudentT, self).__init__(input_parameters, name) self.visited = False - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a Student's T-distribution using the current values for each probabilistic model from which the model derives. @@ -398,7 +398,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a multivariate normal distribution using the current values for each probabilistic model from which the model derives. @@ -532,7 +532,7 @@ def _check_output(self, parameters): """ return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a multivariate Student's T-distribution using the current values for each probabilistic model from which the model derives. diff --git a/abcpy/discretemodels.py b/abcpy/discretemodels.py index 41431ad4..61bb3cb4 100644 --- a/abcpy/discretemodels.py +++ b/abcpy/discretemodels.py @@ -52,7 +52,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from the bernoulli distribution associtated with the probabilistic model. @@ -157,7 +157,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples from a binomial distribution using the current values for each probabilistic model from which the model derives. @@ -256,7 +256,7 @@ def _check_output(self, parameters): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """ Samples k values from the defined possion distribution. diff --git a/abcpy/distances.py b/abcpy/distances.py index 73ca3bfa..fa413f26 100644 --- a/abcpy/distances.py +++ b/abcpy/distances.py @@ -117,7 +117,7 @@ def __init__(self, statistics): # summary statistics of them and not recalculate it each time self.s1 = None self.data_set = None - + self.dataSame = False def distance(self, d1, d2): """Calculates the distance between two datasets. @@ -127,15 +127,24 @@ def distance(self, d1, d2): d1, d2: list A list, containing a list describing the data set """ + if not isinstance(d1, list): raise TypeError('Data is not of allowed types') if not isinstance(d2, list): raise TypeError('Data is not of allowed types') + # Check whether d1 is same as self.data_set + if self.data_set is not None: + if len(np.array(d1[0]).reshape(-1,)) == 1: + self.data_set == d1 + else: + self.dataSame = all([(np.array(self.data_set[i]) == np.array(d1[i])).all() for i in range(len(d1))]) + # Extract summary statistics from the dataset - if(self.s1 is None or self.data_set!=d1): + if(self.s1 is None or self.dataSame is False): self.s1 = self.statistics_calc.statistics(d1) self.data_set = d1 + s2 = self.statistics_calc.statistics(d2) # compute distance between the statistics @@ -178,6 +187,7 @@ def __init__(self, statistics): # Since the observations do always stay the same, we can save the summary statistics of them and not recalculate it each time self.s1 = None self.data_set = None + self.dataSame = False def distance(self, d1, d2): """Calculates the distance between two datasets. @@ -192,8 +202,15 @@ def distance(self, d1, d2): if not isinstance(d2, list): raise TypeError('Data is not of allowed types') + # Check whether d1 is same as self.data_set + if self.data_set is not None: + if len(np.array(d1[0]).reshape(-1,)) == 1: + self.data_set == d1 + else: + self.dataSame = all([(np.array(self.data_set[i]) == np.array(d1[i])).all() for i in range(len(d1))]) + # Extract summary statistics from the dataset - if(self.s1 is None or self.data_set!=d1): + if(self.s1 is None or self.dataSame is False): self.s1 = self.statistics_calc.statistics(d1) self.data_set = d1 s2 = self.statistics_calc.statistics(d2) @@ -229,6 +246,7 @@ def __init__(self, statistics): # Since the observations do always stay the same, we can save the summary statistics of them and not recalculate it each time self.s1 = None self.data_set = None + self.dataSame = False def distance(self, d1, d2): """Calculates the distance between two datasets. @@ -238,13 +256,22 @@ def distance(self, d1, d2): d1, d2: list A list, containing a list describing the data set """ + + if not isinstance(d1, list): raise TypeError('Data is not of allowed types') if not isinstance(d2, list): raise TypeError('Data is not of allowed types') + # Check whether d1 is same as self.data_set + if self.data_set is not None: + if len(np.array(d1[0]).reshape(-1,)) == 1: + self.data_set == d1 + else: + self.dataSame = all([(np.array(self.data_set[i]) == np.array(d1[i])).all() for i in range(len(d1))]) + # Extract summary statistics from the dataset - if(self.s1 is None or self.data_set!=d1): + if(self.s1 is None or self.dataSame is False): self.s1 = self.statistics_calc.statistics(d1) self.data_set = d1 s2 = self.statistics_calc.statistics(d2) diff --git a/abcpy/graphtools.py b/abcpy/graphtools.py index d0f706ef..edfb4de5 100644 --- a/abcpy/graphtools.py +++ b/abcpy/graphtools.py @@ -388,7 +388,7 @@ def get_correct_ordering(self, parameters_and_models, models=None, is_root = Tru return ordered_parameters - def simulate(self, n_samples_per_param, rng=np.random.RandomState()): + def simulate(self, n_samples_per_param, rng=np.random.RandomState(), npc=None): """Simulates data of each model using the currently sampled or perturbed parameters. Parameters @@ -405,7 +405,10 @@ def simulate(self, n_samples_per_param, rng=np.random.RandomState()): for model in self.model: parameters_compatible = model._check_input(model.get_input_values()) if parameters_compatible: - simulation_result = model.forward_simulate(model.get_input_values(), n_samples_per_param, rng=rng) + if npc is not None and npc.communicator().Get_size() > 1: + simulation_result = npc.run_nested(model.forward_simulate, model.get_input_values(), n_samples_per_param, rng=rng) + else: + simulation_result = model.forward_simulate(model.get_input_values(),n_samples_per_param, rng=rng) result.append(simulation_result) else: return None diff --git a/abcpy/inferences.py b/abcpy/inferences.py index 28c8dbb8..7714f82d 100644 --- a/abcpy/inferences.py +++ b/abcpy/inferences.py @@ -2,6 +2,8 @@ import logging import numpy as np +import sys + from abc import ABCMeta, abstractmethod, abstractproperty from scipy import optimize @@ -153,7 +155,7 @@ class RejectionABC(InferenceMethod): Distance object defining the distance measure to compare simulated and observed data sets. backend: abcpy.backends.Backend Backend object defining the backend to be used. - seed: integer, optional + seed: integer, optionaldistance Optional initial seed for the random number generator. The default value is generated randomly. """ @@ -246,7 +248,7 @@ def sample(self, observations, n_samples, n_samples_per_param, epsilon, full_out return journal - def _sample_parameter(self, rng): + def _sample_parameter(self, rng, npc=None): """ Samples a single model parameter and simulates from it until distance between simulated outcome and the observation is @@ -274,7 +276,7 @@ def _sample_parameter(self, rng): # Accept new parameter value if the distance is less than epsilon self.sample_from_prior(rng=rng) theta = np.array(self.get_parameters(self.model)).reshape(-1,) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 if(y_sim is not None): distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) @@ -431,27 +433,26 @@ def sample(self, observations, steps, epsilon_init, n_samples = 10000, n_samples # Since each entry of new_cov_mats is a numpy array, we can multiply like this accepted_cov_mats = [covFactor * new_cov_mat for new_cov_mat in new_cov_mats] + seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) rng_pds = self.backend.parallelize(rng_arr) # 0: update remotely required variables - # print("INFO: Broadcasting parameters.") + #print("INFO: Broadcasting parameters.") self.logger.info("Broadcasting parameters") self.epsilon = epsilon_arr[aStep] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters, accepted_weights, accepted_cov_mats) # 1: calculate resample parameters - # print("INFO: Resampling parameters") + #print("INFO: Resampling parameters") self.logger.info("Resamping parameters") - params_and_dists_and_ysim_and_counter_pds = self.backend.map(self._resample_parameter, rng_pds) - params_and_dists_and_ysim_and_counter = self.backend.collect(params_and_dists_and_ysim_and_counter_pds) - new_parameters, distances, counter = [list(t) for t in zip(*params_and_dists_and_ysim_and_counter)] + params_and_dists_and_counter_pds = self.backend.map(self._resample_parameter, rng_pds) + params_and_dists_and_counter = self.backend.collect(params_and_dists_and_counter_pds) + new_parameters, distances, counter = [list(t) for t in zip(*params_and_dists_and_counter)] new_parameters = np.array(new_parameters) - #print(new_parameters) - for count in counter: self.simulation_counter+=count @@ -464,6 +465,7 @@ def sample(self, observations, steps, epsilon_init, n_samples = 10000, n_samples else: epsilon_arr[aStep + 1] = np.max( [np.percentile(distances, epsilon_percentile), epsilon_arr[aStep + 1]]) + # 2: calculate weights for new parameters self.logger.info("Calculating weights") @@ -514,8 +516,7 @@ def sample(self, observations, steps, epsilon_init, n_samples = 10000, n_samples return journal - # define helper functions for map step - def _resample_parameter(self, rng): + def _resample_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is @@ -531,6 +532,8 @@ def _resample_parameter(self, rng): np.array accepted parameter """ + + #print(npc.communicator()) rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) distance = self.distance.dist_max() @@ -543,12 +546,10 @@ def _resample_parameter(self, rng): counter=0 while distance > self.epsilon: - #print( " distance: " + str(distance) + " epsilon: " + str(self.epsilon)) - if self.accepted_parameters_manager.accepted_parameters_bds == None: self.sample_from_prior(rng=rng) theta = self.get_parameters() - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 else: @@ -560,24 +561,23 @@ def _resample_parameter(self, rng): if(perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1])!=0): theta = perturbation_output[1] break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 - if(y_sim is not None): - distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(),y_sim) - self.logger.debug("distance after {:4d} simulations: {:e}".format( - counter, distance)) - else: - distance = self.distance.dist_max() + distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + + self.logger.debug("distance after {:4d} simulations: {:e}".format( + counter, distance)) self.logger.debug( "Needed {:4d} simulations to reach distance {:e} < epsilon = {:e}". format(counter, distance, float(self.epsilon)) ) - return (theta, distance, counter) - def _calculate_weight(self, theta): + return None + + def _calculate_weight(self, theta, npc=None): """ Calculates the weight for the given parameter using accepted_parameters, accepted_cov_mat @@ -747,7 +747,8 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) - # The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient to broadcast these parameters once, instead of collecting them at each kernel in each step + # The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient + # to broadcast these parameters once, instead of collecting them at each kernel in each step kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( @@ -812,10 +813,10 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 # 2: calculate approximate lieklihood for new parameters self.logger.info("Calculate approximate likelihood") merged_sim_data_parameter = self.flat_map(new_parameters, self.n_samples_per_param, self._simulate_data) + # Compute likelihood for each parameter value approx_likelihood_new_parameters, counter = self.simple_map(merged_sim_data_parameter, self._approx_calc) approx_likelihood_new_parameters = np.array(approx_likelihood_new_parameters).reshape(-1, 1) - for count in counter: self.simulation_counter+=count @@ -829,6 +830,8 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 for i in range(0, self.n_samples): new_weights[i] = new_weights[i] * approx_likelihood_new_parameters[i] sum_of_weights += new_weights[i] + + #print("new_weights : ", new_weights, ", sum_of_weights : ", sum_of_weights) new_weights = new_weights / sum_of_weights accepted_parameters = new_parameters @@ -880,10 +883,18 @@ def simple_map(self, data, map_function): main_result, counter = [list(t) for t in zip(*result)] return main_result, counter def flat_map(self, data, n_repeat, map_function): + # Create an array of data, with each data repeated n_repeat many times repeated_data = np.repeat(data, n_repeat, axis=0) - repeated_data_pds = self.backend.parallelize(repeated_data) - repeated_data__result_pds = self.backend.map(map_function, repeated_data_pds) - repeated_data_result = self.backend.collect(repeated_data__result_pds) + # Create an see array + n_total = n_repeat * data.shape[0] + seed_arr = self.rng.randint(1, n_total * n_total, size=n_total, dtype=np.int32) + rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) + # Create data and rng array + repeated_data_rng = [[repeated_data[ind,:],rng_arr[ind]] for ind in range(n_total)] + repeated_data_rng_pds = self.backend.parallelize(repeated_data_rng) + # Map the function on the data using the corresponding rng + repeated_data_result_pds = self.backend.map(map_function, repeated_data_rng_pds) + repeated_data_result = self.backend.collect(repeated_data_result_pds) repeated_data, result = [list(t) for t in zip(*repeated_data_result)] merged_result_data = [] for ind in range(0, data.shape[0]): @@ -894,7 +905,7 @@ def flat_map(self, data, n_repeat, map_function): return merged_result_data # define helper functions for map step - def _simulate_data(self, theta): + def _simulate_data(self, data, npc=None): """ Simulate n_sample_per_param many datasets for new parameter Parameters @@ -909,12 +920,12 @@ def _simulate_data(self, theta): # Simulate the fake data from the model given the parameter value theta # print("DEBUG: Simulate model for parameter " + str(theta)) + theta, rng = data[0], data[1] self.set_parameters(theta) - y_sim = self.simulate(1, self.rng) - + y_sim = self.simulate(1, rng, npc=npc) return (theta, y_sim) - def _approx_calc(self, sim_data_parameter): + def _approx_calc(self, sim_data_parameter, npc=None): """ Compute likelihood for new parameters using approximate likelihood function Parameters @@ -926,26 +937,23 @@ def _approx_calc(self, sim_data_parameter): float The approximated likelihood function """ + # Extract data and parameter y_sim, theta = sim_data_parameter[0], sim_data_parameter[1] - # print("DEBUG: Extracting observation.") obs = self.accepted_parameters_manager.observations_bds.value() - # print("DEBUG: Computing likelihood...") total_pdf_at_theta = 1. lhd = self.likfun.likelihood(obs, y_sim) - # print("DEBUG: Likelihood is :" + str(lhd)) pdf_at_theta = self.pdf_of_prior(self.model, theta) total_pdf_at_theta *= (pdf_at_theta * lhd) - # print("DEBUG: prior pdf evaluated at theta is :" + str(pdf_at_theta)) return (total_pdf_at_theta, 1) - def _calculate_weight(self, theta): + def _calculate_weight(self, theta, npc=None): """ Calculates the weight for the given parameter using accepted_parameters, accepted_cov_mat @@ -1393,15 +1401,15 @@ def destroy(bc): self.all_distances_bds = self.backend.broadcast(all_distances) # define helper functions for map step - def _accept_parameter(self, data): + def _accept_parameter(self, data, npc=None): """ Samples a single model parameter and simulate from it until accepted with probabilty exp[-rho(x,y)/epsilon]. Parameters ---------- - seed: integer - Initial seed for the random number generator. + seed_and_index: list of two integers + Initial seed for the random number generator and the index of data to operate on Returns ------- @@ -1426,7 +1434,7 @@ def _accept_parameter(self, data): self.sample_from_prior(rng=rng) new_theta = np.array(self.get_parameters()).reshape(-1,) all_parameters.append(new_theta) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) all_distances.append(distance) @@ -1444,10 +1452,9 @@ def _accept_parameter(self, data): new_theta = np.array(perturbation_output[1]).reshape(-1,) break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) - smooth_distance = self._smoother_distance([distance], self.all_distances_bds.value()) ## Calculate acceptance probability: @@ -1576,15 +1583,14 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 for aStep in range(0, steps): - self.logger.info("Step {}".format(aStep)) - + self.logger.info("ABCsubsim step {}".format(aStep)) if aStep==0 and journal_file is not None: accepted_parameters = journal.parameters[-1] accepted_weights = journal.weights[-1] accepted_cov_mats = journal.opt_values[-1] # main ABCsubsim algorithm - self.logger.info("Initializatio of ABCsubsim") + self.logger.info("Initialization of ABCsubsim") seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=int(n_samples / temp_chain_length), dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) @@ -1602,7 +1608,9 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 # print("INFO: Initial accepted parameter parameters") self.logger.info("Initial accepted parameters") params_and_dists_pds = self.backend.map(self._accept_parameter, rng_and_index_pds) + self.logger.debug("Map random number to a pseudo-observation") params_and_dists = self.backend.collect(params_and_dists_pds) + self.logger.debug("Collect results from the mapping") new_parameters, new_distances, counter = [list(t) for t in zip(*params_and_dists)] for count in counter: @@ -1612,12 +1620,15 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 distances = np.concatenate(new_distances) # 2: Sort and renumber samples + self.logger.debug("Sort and renumber samples.") SortIndex = sorted(range(len(distances)), key=lambda k: distances[k]) distances = distances[SortIndex] accepted_parameters = accepted_parameters[SortIndex, :] + # 3: Calculate and broadcast annealling parameters - temp_chain_length = chain_length + self.logger.debug("Calculate and broadcast annealling parameters.") + temp_chain_length = self.chain_length if aStep > 0: anneal_parameter_old = anneal_parameter anneal_parameter = 0.5 * ( @@ -1626,6 +1637,7 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 # 4: Update proposal covariance matrix (Parallelized) + self.logger.debug("Update proposal covariance matrix (Parallelized).") if aStep == 0: self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) @@ -1649,7 +1661,9 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 rng_and_index_arr = np.column_stack((rng_arr, index_arr)) rng_and_index_pds = self.backend.parallelize(rng_and_index_arr) + self.logger.debug("Update co-variance matrix in parallel (map).") cov_mats_index_pds = self.backend.map(self._update_cov_mat, rng_and_index_pds) + self.logger.debug("Collect co-variance matrix.") cov_mats_index = self.backend.collect(cov_mats_index_pds) cov_mats, T, accept_index, counter = [list(t) for t in zip(*cov_mats_index)] @@ -1661,6 +1675,7 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 accepted_cov_mats = cov_mats[ind] break + self.logger.debug("Broadcast accepted parameters.") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) if full_output == 1: @@ -1700,7 +1715,7 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 return journal # define helper functions for map step - def _accept_parameter(self, rng_and_index): + def _accept_parameter(self, rng_and_index, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is @@ -1732,7 +1747,7 @@ def _accept_parameter(self, rng_and_index): if self.accepted_parameters_manager.accepted_parameters_bds == None: self.sample_from_prior(rng=rng) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) result_theta.append(self.get_parameters()) @@ -1740,7 +1755,7 @@ def _accept_parameter(self, rng_and_index): else: theta = np.array(self.accepted_parameters_manager.accepted_parameters_bds.value()[index]).reshape(-1,) self.set_parameters(theta) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) result_theta.append(theta) @@ -1750,7 +1765,7 @@ def _accept_parameter(self, rng_and_index): perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1])!= 0: break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng,npc=npc) counter+=1 new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) @@ -1771,10 +1786,9 @@ def _accept_parameter(self, rng_and_index): else: result_theta.append(theta) result_distance.append(distance) - return (result_theta, result_distance, counter) - def _update_cov_mat(self, rng_t): + def _update_cov_mat(self, rng_t, npc=None): """ Updates the covariance matrix. @@ -1806,24 +1820,27 @@ def _update_cov_mat(self, rng_t): counter = 0 for ind in range(0, self.chain_length): + self.logger.debug("Parameter acceptance loop step {}.".format(ind)) while True: perturbation_output = self.perturb(0, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + self.logger.debug("Calculate acceptance probability.") ## Calculate acceptance probability: ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,0 , theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,0 , perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, ratio_prior_prob * ratio_likelihood_prob) * (new_distance < self.anneal_parameter) - ## If accepted if rng.binomial(1, acceptance_prob) == 1: theta = perturbation_output[1] acceptance = acceptance + 1 + + self.logger.debug("Return accepted parameters.") if acceptance / 10 <= 0.5 and acceptance / 10 >= 0.3: return (accepted_cov_mats_transformed, t, 1, counter) else: @@ -2081,7 +2098,7 @@ def destroy(bc): self.accepted_dist_bds = self.backend.broadcast(accepted_dist) # define helper functions for map step - def _accept_parameter(self, rng): + def _accept_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is @@ -2100,7 +2117,6 @@ def _accept_parameter(self, rng): rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) distance = self.distance.dist_max() - mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) @@ -2109,9 +2125,10 @@ def _accept_parameter(self, rng): if self.accepted_parameters_manager.accepted_parameters_bds == None: while distance > self.epsilon[-1]: self.sample_from_prior(rng=rng) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + index_accept = 1 else: index = rng.choice(len(self.accepted_parameters_manager.accepted_parameters_bds.value()), size=1) @@ -2122,7 +2139,7 @@ def _accept_parameter(self, rng): perturbation_output = self.perturb(index[0], rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) @@ -2377,7 +2394,7 @@ def destroy(bc): self.accepted_dist_bds = self.backend.broadcast(accepted_dist) # define helper functions for map step - def _accept_parameter(self, rng): + def _accept_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is @@ -2403,9 +2420,10 @@ def _accept_parameter(self, rng): if self.accepted_parameters_manager.accepted_parameters_bds == None: self.sample_from_prior(rng=rng) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 - dist = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + weight = 1.0 else: index = rng.choice(len(self.accepted_parameters_manager.accepted_weights_bds.value()), size=1, @@ -2417,9 +2435,9 @@ def _accept_parameter(self, rng): if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 - dist = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) + distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) denominator = 0.0 @@ -2428,7 +2446,7 @@ def _accept_parameter(self, rng): denominator += self.accepted_parameters_manager.accepted_weights_bds.value()[i, 0] * pdf_value weight = 1.0 * prior_prob / denominator - return (self.get_parameters(self.model), dist, weight, counter) + return (self.get_parameters(self.model), distance, weight, counter) class SMCABC(BaseDiscrepancy, InferenceMethod): @@ -2668,7 +2686,6 @@ def sample(self, observations, steps, n_samples = 10000, n_samples_per_param = 1 accepted_parameters = new_parameters accepted_y_sim = new_y_sim - if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): self.logger.info("Saving configuration to output journal") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) @@ -2761,7 +2778,7 @@ def destroy(bc): # define helper functions for map step - def _accept_parameter(self, rng_and_index): + def _accept_parameter(self, rng_and_index, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is @@ -2787,11 +2804,10 @@ def _accept_parameter(self, rng_and_index): self.accepted_parameters_manager.model) counter=0 - # print("on seed " + str(seed) + " distance: " + str(distance) + " epsilon: " + str(self.epsilon)) - if self.accepted_parameters_manager.accepted_parameters_bds == None: + if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 else: if self.accepted_parameters_manager.accepted_weights_bds.value()[index] > 0: @@ -2800,22 +2816,26 @@ def _accept_parameter(self, rng_and_index): perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break - y_sim = self.simulate(self.n_samples_per_param, rng=rng) + y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter+=1 y_sim_old = self.accepted_y_sim_bds.value()[index] ## Calculate acceptance probability: numerator = 0.0 denominator = 0.0 for ind in range(self.n_samples_per_param): - numerator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), [[y_sim[0][ind]]]) < self.epsilon[-1]) - denominator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), [[y_sim_old[0][ind]]]) < self.epsilon[-1]) + numerator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), + [[y_sim[0][ind]]]) < self.epsilon[-1]) + denominator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), + [[y_sim_old[0][ind]]]) < self.epsilon[-1]) if denominator == 0: ratio_data_epsilon = 1 else: ratio_data_epsilon = numerator / denominator - ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) + ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, + theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, index, theta) - kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, index, perturbation_output[1]) + kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, index, + perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, ratio_data_epsilon * ratio_prior_prob * ratio_likelihood_prob) if rng.binomial(1, acceptance_prob) == 1: @@ -2827,4 +2847,4 @@ def _accept_parameter(self, rng_and_index): self.set_parameters(self.accepted_parameters_manager.accepted_parameters_bds.value()[index]) y_sim = self.accepted_y_sim_bds.value()[index] - return (self.get_parameters(), y_sim, counter) + return (self.get_parameters(), y_sim, counter) \ No newline at end of file diff --git a/abcpy/jointapprox_lhd.py b/abcpy/jointapprox_lhd.py index 7bf3b319..45028dbf 100644 --- a/abcpy/jointapprox_lhd.py +++ b/abcpy/jointapprox_lhd.py @@ -1,9 +1,5 @@ from abc import ABCMeta, abstractmethod -import numpy as np -from glmnet import LogitNet -from sklearn import linear_model - class JointApprox_likelihood(metaclass = ABCMeta): """This abstract base class defines how the combination of distances computed on the observed and diff --git a/abcpy/jointdistances.py b/abcpy/jointdistances.py index 76b148ea..e4ed21ce 100644 --- a/abcpy/jointdistances.py +++ b/abcpy/jointdistances.py @@ -1,9 +1,6 @@ from abc import ABCMeta, abstractmethod import numpy as np -from glmnet import LogitNet -from sklearn import linear_model - class JointDistance(metaclass = ABCMeta): """This abstract base class defines how the combination of distances computed on the observed and @@ -131,4 +128,4 @@ def dist_max(self): combined_distance_max = 0.0 for ind in range(len(self.distances)): combined_distance_max += self.weights[ind]*self.distances[ind].dist_max() - return combined_distance_max \ No newline at end of file + return combined_distance_max diff --git a/abcpy/multilevel.py b/abcpy/multilevel.py new file mode 100644 index 00000000..ce1aa02d --- /dev/null +++ b/abcpy/multilevel.py @@ -0,0 +1,101 @@ +from abc import ABCMeta, abstractmethod + +import numpy as np +from glmnet import LogitNet +from sklearn import linear_model + + +class Multilevel(metaclass=ABCMeta): + """This abstract base class defines how the distance between the observed and + simulated data should be implemented. + """ + + @abstractmethod + def __init__(self, backend, data_thinner, criterion_calculator): + """The constructor of a sub-class must accept a non-optional data thinner and criterion + calculator as parameters. + + Parameters + ---------- + backend: abcpy.backend + Backend object + data_thinner : object + Object that operates on data and thins it + criterion_calculator: object + Object that operates on n_samples_per_param data and computes the criterion + """ + + self.bacend = backend + self.data_thinner = data_thinner + self.criterion_calculator = criterion_calculator + + raise NotImplementedError + + @abstractmethod + def compute(self, d, n_repeat): + """To be overwritten by any sub-class: should calculate the criterion for each + set of data_element in the lis data + + Notes + ----- + The data set d is an array-like structures that contain n data + points each. An implementation of the distance function should work along + the following steps: + + 1. Transform both input sets dX = [ dX1, dX2, ..., dXn ] to sX = [sX1, sX2, + ..., sXn] using the statistics object. See _calculate_summary_stat method. + + 2. Calculate the mutual desired distance, here denoted by -, between the + statstics dist = [s11 - s21, s12 - s22, ..., s1n - s2n]. + + Important: any sub-class must not calculate the distance between data sets + d1 and d2 directly. This is the reason why any sub-class must be + initialized with a statistics object. + + Parameters + ---------- + d: Python list + Contains n data points. + + + Returns + ------- + numpy.ndarray + The criterion calculated for each data point. + """ + + raise NotImplementedError + + ## Simple_map and Flat_map: Python wrapper for nested parallelization + def simple_map(self, data, map_function): + data_pds = self.backend.parallelize(data) + result_pds = self.backend.map(map_function, data_pds) + result = self.backend.collect(result_pds) + main_result, counter = [list(t) for t in zip(*result)] + return main_result, counter + + def flat_map(self, data, n_repeat, map_function): + # Create an array of data, with each data repeated n_repeat many times + repeated_data = np.repeat(data, n_repeat, axis=0) + # Create an see array + n_total = n_repeat * data.shape[0] + seed_arr = self.rng.randint(1, n_total * n_total, size=n_total, dtype=np.int32) + rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) + # Create data and rng array + repeated_data_rng = [[repeated_data[ind,:],rng_arr[ind]] for ind in range(n_total)] + repeated_data_rng_pds = self.backend.parallelize(repeated_data_rng) + # Map the function on the data using the corresponding rng + repeated_data_result_pds = self.backend.map(map_function, repeated_data_rng_pds) + repeated_data_result = self.backend.collect(repeated_data_result_pds) + repeated_data, result = [list(t) for t in zip(*repeated_data_result)] + merged_result_data = [] + for ind in range(0, data.shape[0]): + merged_result_data.append([[[result[np.int(i)][0][0] \ + for i in + np.where(np.mean(repeated_data == data[ind, :], axis=1) == 1)[0]]], + data[ind, :]]) + return merged_result_data + + +class Prototype(Multilevel): + \ No newline at end of file diff --git a/abcpy/perturbationkernel.py b/abcpy/perturbationkernel.py index ec0d5d23..4d920747 100644 --- a/abcpy/perturbationkernel.py +++ b/abcpy/perturbationkernel.py @@ -278,6 +278,7 @@ def calculate_cov(self, accepted_parameters_manager, kernel_index): cov = np.var(np.array(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]).astype(float)) else: cov = np.cov(np.array(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]).astype(float), rowvar=False) + return cov diff --git a/abcpy/probabilisticmodels.py b/abcpy/probabilisticmodels.py index b783f7bf..07bd956d 100644 --- a/abcpy/probabilisticmodels.py +++ b/abcpy/probabilisticmodels.py @@ -681,7 +681,7 @@ def _check_output(self, values): @abstractmethod - def forward_simulate(self, input_values, k, rng): + def forward_simulate(self, input_values, k, rng, mpi_comm): """ Provides the output (pseudo data) from a forward simulation of the current model. @@ -843,7 +843,7 @@ def get_input_values(self): return [] - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): return [np.array(self._fixed_values) for _ in range(k)] @@ -889,7 +889,7 @@ def __init__(self, parameters, name=''): super(ModelResultingFromOperation, self).__init__(input_parameters, name) - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): raise NotImplementedError @@ -971,7 +971,7 @@ def sample_from_input_models(self, k, rng=np.random.RandomState()): class SummationModel(ModelResultingFromOperation): """This class represents all probabilistic models resulting from an addition of two probabilistic models""" - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """Adds the sampled values of both parent distributions. Parameters @@ -1015,7 +1015,7 @@ def forward_simulate(self, input_values, k, rng=np.random.RandomState()): class SubtractionModel(ModelResultingFromOperation): """This class represents all probabilistic models resulting from an subtraction of two probabilistic models""" - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """Adds the sampled values of both parent distributions. Parameters @@ -1057,7 +1057,7 @@ def forward_simulate(self, input_values, k, rng=np.random.RandomState()): class MultiplicationModel(ModelResultingFromOperation): """This class represents all probabilistic models resulting from a multiplication of two probabilistic models""" - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """Multiplies the sampled values of both parent distributions element wise. Parameters @@ -1099,7 +1099,7 @@ def forward_simulate(self, input_values, k, rng=np.random.RandomState()): class DivisionModel(ModelResultingFromOperation): """This class represents all probabilistic models resulting from a division of two probabilistic models""" - def forward_simulate(self, input_valus, k, rng=np.random.RandomState()): + def forward_simulate(self, input_valus, k, rng=np.random.RandomState(), mpi_comm=None): """Divides the sampled values of both parent distributions. Parameters @@ -1161,7 +1161,7 @@ def _check_input(self, input_values): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """Raises the sampled values of the base by the exponent. Parameters @@ -1223,7 +1223,7 @@ def _check_input(self, input_values): return True - def forward_simulate(self, input_values, k, rng=np.random.RandomState()): + def forward_simulate(self, input_values, k, rng=np.random.RandomState(), mpi_comm=None): """Raises the base by the sampled value of the exponent. Parameters diff --git a/doc/source/installation.rst b/doc/source/installation.rst index 2e5f0293..24b05a43 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -34,7 +34,9 @@ To create a package and install it, do :: make package - pip3 install build/dist/abcpy-0.5.4-py3-none-any.whl + + pip3 install build/dist/abcpy-0.5.5-py3-none-any.whl + Note that ABCpy requires Python3. diff --git a/doc/source/parallelization.rst b/doc/source/parallelization.rst index 9df905cd..042ed819 100644 --- a/doc/source/parallelization.rst +++ b/doc/source/parallelization.rst @@ -10,12 +10,93 @@ Running ABC algorithms is often computationally expensive, thus ABCpy is built with parallelization in mind. In order to run your inference schemes in parallel on multiple nodes (computers) you can choose from the following backends. +Using the MPI Backend +~~~~~~~~~~~~~~~~~~~~~ + +To run ABCpy in parallel using MPI, one only needs to use the provided MPI +backend. Using the same example as before, the statements for the backend have to +be changed to + +.. literalinclude:: ../../examples/backends/mpi/pmcabc_gaussian.py + :language: python + :lines: 6-10 + :dedent: 4 + +In words, one only needs to initialize an instance of the MPI backend. The +number of ranks to spawn are specified at runtime through the way the script is +run. A minimum of two ranks is required, since rank 0 (master) is used to +orchestrate the calculation and all other ranks (workers) actually perform the +calculation. (The default value of `process_per_model` is 1. If your simulator +model is not parallelized using MPI, do not specify +`process_per_model > 1`. The use of `process_per_model` for nested parallelization +will be explained below.) + +The standard way to run the script using MPI is directly via mpirun like below +or on a cluster through a job scheduler like Slurm: + +:: + + mpirun -np 4 python3 pmcabc_gaussian.py + + +The adapted Python code can be found in +`examples/backend/mpi/pmcabc_gaussian.py`. + +Nested-MPI parallelization for MPI-parallelized simulator models +------------------------------------------------------------------ + +Sometimes, the simulator model itself has +large compute requirements and needs parallelization. To achieve this parallelization +using threads, the MPI backend need to be configured such that each MPI +rank can spawn multiple threads on a node. However, there might be situations +where node-local parallelization using threads is not sufficient and +parallelization across nodes is required. + +Parallelization of the forward model across nodes is possible *but limited* to +the MPI backend. Technically, this is implemented using individual MPI +communicators for each forward model. The number of ranks per communicator +(defined as: `process_per_model`) +can be passed at the initialization of the backend as follows: + +.. literalinclude:: ../../examples/backends/mpi/mpi_model_inferences.py + :language: python + :lines: 10-11 + :dedent: 4 + +Here each model is assigned a MPI communicator with 2 ranks. Clearly, the MPI +job has to be configured manually such that the total amount of MPI ranks is ideally +a multiple of the ranks per communicator plus one additional rank for the +master. For example, if we want to run n instances of a MPI model and allows m +processes to each instance, we will have to spawn (n*m)+1 ranks. + +For `forward_simulation` of the MPI-parallelized simulator model has to be able +to take an MPI communicator as a parameter. + +An example of an MPI-parallelized simulator model, which can be used with ABCpy +nested-parallelization, can be found in `examples/backend/mpi/mpi_model_inferences.py`. +The `forward_simulation` function of the above model is as follows: + +.. literalinclude:: ../../examples/backends/mpi/mpi_model_inferences.py + :language: python + :lines: 48-77 + :dedent: 4 + +Note that in order to run jobs in parallel you need to have MPI installed on the +system(s) in question with the requisite Python bindings for MPI (mpi4py). The +dependencies of the MPI backend can be install with +`pip install -r requirements/backend-mpi.txt`. + +Details on the installation can be found on the official `Open MPI homepage +`_ and the `mpi4py homepage +`_. Further, keep in mind that the ABCpy library has +to be properly installed on the cluster, such that it is available to the Python +interpreters on the master and the worker nodes. Using the Spark Backend ~~~~~~~~~~~~~~~~~~~~~~~ To run ABCpy in parallel using Apache Spark, one only needs to use the provided -Spark backend. Considering the example from above, the statements for the +Spark backend. Considering the example from before, the statements for the backend have to be changed to .. literalinclude:: ../../examples/backends/apache_spark/pmcabc_gaussian.py @@ -51,46 +132,6 @@ Details on the installation can be found on the official `homepage be properly installed on the cluster, such that it is available to the Python interpreters on the master and the worker nodes. -Using the MPI Backend -~~~~~~~~~~~~~~~~~~~~~ - -To run ABCpy in parallel using MPI, one only needs to use the provided MPI -backend. Using the same example as above, the statements for the backend have to -be changed to - -.. literalinclude:: ../../examples/backends/mpi/pmcabc_gaussian.py - :language: python - :lines: 6-7 - :dedent: 4 - -In words, one only needs to initialize an instance of the MPI backend. The -number of ranks to spawn are specified at runtime through the way the script is -run. A minimum of two ranks is required, since rank 0 (master) is used to -orchestrate the calculation and all other ranks (workers) actually perform the -calculation. - -The standard way to run the script using Open MPI is directly via mpirun like below -or on a cluster through a job scheduler like Slurm: - -:: - - mpirun -np 4 python3 pmcabc_gaussian.py - - -The adapted Python code can be found in -`examples/backend/mpi/pmcabc_gaussian.py`. - -Note that in order to run jobs in parallel you need to have MPI installed on the -system(s) in question with the requisite Python bindings for MPI (mpi4py). The -dependencies of the MPI backend can be install with -`pip install -r requirements/backend-mpi.txt`. - -Details on the installation can be found on the official `Open MPI homepage -`_ and the `mpi4py homepage -`_. Further, keep in mind that the ABCpy library has -to be properly installed on the cluster, such that it is available to the Python -interpreters on the master and the worker nodes. - Using Cluster Infrastructure ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/backends/mpi/mpi_model_inferences.py b/examples/backends/mpi/mpi_model_inferences.py new file mode 100644 index 00000000..9134390c --- /dev/null +++ b/examples/backends/mpi/mpi_model_inferences.py @@ -0,0 +1,337 @@ +#import logging +#logging.basicConfig(level=logging.DEBUG) + +import numpy as np +from abcpy.probabilisticmodels import ProbabilisticModel, InputConnector + +def setup_backend(): + global backend + + from abcpy.backends import BackendMPI as Backend + backend = Backend(process_per_model=2) + +class NestedBivariateGaussian(ProbabilisticModel): + """ + This is a show case model of bi-variate Gaussian distribution where we assume + the standard deviation to be unit. + """ + + def __init__(self, parameters, name='Gaussian'): + # We expect input of type parameters = [mu, sigma] + if not isinstance(parameters, list): + raise TypeError('Input of Normal model is of type list') + + if len(parameters) != 2: + raise RuntimeError('Input list must be of length 2, containing [mu1, mu1].') + + input_connector = InputConnector.from_list(parameters) + super().__init__(input_connector, name) + + def _check_input(self, input_values): + # Check whether input has correct type or format + if len(input_values) != 2: + raise ValueError('Number of parameters are 2 (two means).') + return True + + def _check_output(self, values): + if not isinstance(values, np.ndarray): + raise ValueError('Output of the normal distribution is always a numpy array.') + + if values.shape[0] != 2: + raise ValueError('Output shape should be of dimension 2.') + + return True + + def get_output_dimension(self): + return 2 + + def forward_simulate(self, input_values, k, rng=np.random.RandomState, mpi_comm=None): + if mpi_comm is None: + ValueError('MPI-parallelized simulator model needs to have access \ + to a MPI communicator object') + #print("Start Forward Simulate on rank {}".format(mpi_comm.Get_rank())) + rank = mpi_comm.Get_rank() + # Extract the input parameters + mu = input_values[rank] + sigma = 1 + # Do the actual forward simulation + vector_of_k_samples = np.array(rng.normal(mu, sigma, k)) + + # Send everything back to rank 0 + data = mpi_comm.gather(vector_of_k_samples, root=0) + + # Format the output to obey API and broadcast it before return + result = None + if rank == 0: + result = [None] * k + for i in range(k): + element0 = data[0][i] + element1 = data[1][i] + point = np.array([element0, element1]) + result[i] = point + result = [np.array([result[i]]).reshape(-1, ) for i in range(k)] + #print("End forward sim on master") + return result + else: + #print("End forward sim on workers") + return None + +def infer_parameters_pmcabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import PMCABC + sampler = PMCABC([height_weight_model], [distance_calculator], backend, seed=1) + # sample from scheme + T, n_sample, n_samples_per_param = 2, 10, 1 + eps_arr = np.array([10000]) + epsilon_percentile = 95 + + journal = sampler.sample([y_obs], T, eps_arr, n_sample, n_samples_per_param, epsilon_percentile) + + return journal + +def infer_parameters_abcsubsim(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import ABCsubsim + sampler = ABCsubsim([height_weight_model], [distance_calculator], backend) + steps, n_samples, n_samples_per_param, chain_length = 2, 10, 1, 2 + journal = sampler.sample([y_obs], steps, n_samples, n_samples_per_param, chain_length) + + return journal + +def infer_parameters_rsmcabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import RSMCABC + sampler = RSMCABC([height_weight_model], [distance_calculator], backend, seed=1) + print('sampling') + steps, n_samples, n_samples_per_param, alpha, epsilon_init, epsilon_final = 2, 10, 1, 0.1, 10000, 500 + print('RSMCABC Inferring') + journal = sampler.sample([y_obs], steps, n_samples, n_samples_per_param, alpha , epsilon_init, epsilon_final,full_output=1) + + return journal + +def infer_parameters_sabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import SABC + sampler = SABC([height_weight_model], [distance_calculator], backend, seed=1) + print('sampling') + steps, epsilon, n_samples, n_samples_per_param, beta, delta, v = 2, np.array([40000]), 10, 1, 2, 0.2, 0.3 + ar_cutoff, resample, n_update, adaptcov, full_output = 0.1, None, None, 1, 1 + # + # # print('SABC Inferring') + journal = sampler.sample([y_obs], steps, epsilon, n_samples, n_samples_per_param, beta, delta, v, ar_cutoff, resample, n_update, adaptcov, full_output) + + return journal + +def infer_parameters_apmcabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import APMCABC + sampler = APMCABC([height_weight_model], [distance_calculator], backend, seed=1) + steps, n_samples, n_samples_per_param, alpha, acceptance_cutoff, covFactor, full_output, journal_file = 2, 100, 1, 0.2, 0.03, 2.0, 1, None + journal = sampler.sample([y_obs], steps, n_samples, n_samples_per_param, alpha, acceptance_cutoff, covFactor, full_output, journal_file) + + return journal + +def infer_parameters_rejectionabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import RejectionABC + sampler = RejectionABC([height_weight_model], [distance_calculator], backend, seed=1) + n_samples, n_samples_per_param, epsilon = 2, 1, 20000 + journal = sampler.sample([y_obs], n_samples, n_samples_per_param, epsilon) + + return journal + +def infer_parameters_smcabc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + # define distance + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import SMCABC + sampler = SMCABC([height_weight_model], [distance_calculator], backend, seed=1) + steps, n_samples, n_samples_per_param, epsilon = 2, 10, 1, 2000 + journal = sampler.sample([y_obs], steps, n_samples, n_samples_per_param, epsilon, full_output=1) + + return journal + +def infer_parameters_pmc(): + # define observation for true parameters mean=170, 65 + rng = np.random.RandomState() + y_obs = [np.array(rng.multivariate_normal([170, 65], np.eye(2), 1).reshape(2,))] + + # define prior + from abcpy.continuousmodels import Uniform + mu0 = Uniform([[150], [200]], ) + mu1 = Uniform([[25], [100]], ) + + # define the model + height_weight_model = NestedBivariateGaussian([mu0, mu1]) + + # define statistics + from abcpy.statistics import Identity + statistics_calculator = Identity(degree = 2, cross = False) + + from abcpy.approx_lhd import SynLiklihood + approx_lhd = SynLiklihood(statistics_calculator) + + # define sampling scheme + from abcpy.inferences import PMC + sampler = PMC([height_weight_model], [approx_lhd], backend, seed=1) + + # sample from scheme + T, n_sample, n_samples_per_param = 2, 10, 10 + + journal = sampler.sample([y_obs], T, n_sample, n_samples_per_param) + + return journal + + +def setUpModule(): + setup_backend() + +if __name__ == "__main__": + setup_backend() + print('True Value was: ' + str([170, 65])) + print('Posterior Mean of PMCABC: ' + str(infer_parameters_pmcabc().posterior_mean())) + print('Posterior Mean of ABCsubsim: ' + str(infer_parameters_abcsubsim().posterior_mean())) + print('Posterior Mean of RSMCABC: ' + str(infer_parameters_rsmcabc().posterior_mean())) + print('Posterior Mean of SABC: ' + str(infer_parameters_sabc().posterior_mean())) + print('Posterior Mean of SMCABC: ' + str(infer_parameters_smcabc().posterior_mean())) + print('Posterior Mean of APMCABC: ' + str(infer_parameters_apmcabc().posterior_mean())) + print('Posterior Mean of RejectionABC: ' + str(infer_parameters_rejectionabc().posterior_mean())) + print('Posterior Mean of PMC: ' + str(infer_parameters_pmc().posterior_mean())) diff --git a/examples/backends/mpi/pmcabc_gaussian.py b/examples/backends/mpi/pmcabc_gaussian.py index e1b2250d..29d5b540 100644 --- a/examples/backends/mpi/pmcabc_gaussian.py +++ b/examples/backends/mpi/pmcabc_gaussian.py @@ -5,6 +5,10 @@ def setup_backend(): from abcpy.backends import BackendMPI as Backend backend = Backend() + # The above line is equivalent to: + # backend = Backend(process_per_model=1) + # Notice: Models not parallelized by MPI should not be given process_per_model > 1 + def infer_parameters(): @@ -25,17 +29,17 @@ def infer_parameters(): statistics_calculator = Identity(degree = 2, cross = False) # define distance - from abcpy.distances import LogReg - distance_calculator = LogReg(statistics_calculator) + from abcpy.distances import Euclidean + distance_calculator = Euclidean(statistics_calculator) # define sampling scheme from abcpy.inferences import PMCABC sampler = PMCABC([height], [distance_calculator], backend, seed=1) # sample from scheme - T, n_sample, n_samples_per_param = 3, 250, 10 - eps_arr = np.array([.75]) - epsilon_percentile = 10 + T, n_sample, n_samples_per_param = 2, 10, 1 + eps_arr = np.array([10000]) + epsilon_percentile = 95 journal = sampler.sample([y_obs], T, eps_arr, n_sample, n_samples_per_param, epsilon_percentile) return journal @@ -62,7 +66,6 @@ def analyse_journal(journal): import unittest -from mpi4py import MPI def setUpModule(): ''' @@ -83,7 +86,7 @@ class ExampleGaussianMPITest(unittest.TestCase): def test_example(self): journal = infer_parameters() test_result = journal.posterior_mean()[0] - expected_result = 178.07690877694714 + expected_result = 171.4343638312893 self.assertLess(abs(test_result - expected_result), 2) diff --git a/tests/backend_tests_mpi.py b/tests/backend_tests_mpi.py index 2c11f116..d0b81e9d 100644 --- a/tests/backend_tests_mpi.py +++ b/tests/backend_tests_mpi.py @@ -10,12 +10,12 @@ def setUpModule(): If an exception is raised in a setUpModule then none of the tests in the module will be run. - This is useful because the slaves run in a while loop on initialization - only responding to the master's commands and will never execute anything else. + This is useful because the teams run in a while loop on initialization + only responding to the scheduler's commands and will never execute anything else. - On termination of master, the slaves call quit() that raises a SystemExit(). + On termination of scheduler, the teams call quit() that raises a SystemExit(). Because of the behaviour of setUpModule, it will not run any unit tests - for the slave and we now only need to write unit-tests from the master's + for the team and we now only need to write unit-tests from the scheduler's point of view. ''' global rank,backend_mpi @@ -26,13 +26,13 @@ def setUpModule(): class MPIBackendTests(unittest.TestCase): def test_parallelize(self): - data = [0]*backend_mpi.size + data = [0]*backend_mpi.size() pds = backend_mpi.parallelize(data) pds_map = backend_mpi.map(lambda x: x + MPI.COMM_WORLD.Get_rank(), pds) res = backend_mpi.collect(pds_map) - for master_index in backend_mpi.master_node_ranks: - self.assertTrue(master_index not in res,"Node in master_node_ranks performed map.") + for scheduler_index in backend_mpi.scheduler_node_ranks(): + self.assertTrue(scheduler_index not in res,"Node in scheduler_node_ranks performed map.") def test_map(self): data = [1,2,3,4,5] @@ -48,7 +48,7 @@ def test_broadcast(self): bds = backend_mpi.broadcast(100) - #Pollute the BDS values of the master to confirm slaves + #Pollute the BDS values of the scheduler to confirm teams # use their broadcasted value for k,v in backend_mpi.bds_store.items(): backend_mpi.bds_store[k] = 99999 @@ -68,13 +68,13 @@ def check_if_exists(x): data = [1,2,3,4,5] pds = backend_mpi.parallelize(data) - #Check if the pds we just created exists in all the slaves(+master) + #Check if the pds we just created exists in all the teams(+scheduler) id_check_pds = backend_mpi.parallelize([pds.pds_id]*5) pds_check_result = backend_mpi.map(check_if_exists, id_check_pds) self.assertTrue(False not in backend_mpi.collect(pds_check_result),"PDS was not created") - #Delete the PDS on master and try again + #Delete the PDS on scheduler and try again del pds pds_check_result = backend_mpi.map(check_if_exists,id_check_pds) @@ -90,12 +90,12 @@ def check_if_exists(x): data = [1,2,3,4,5] bds = backend_mpi.broadcast(data) - #Check if the pds we just created exists in all the slaves(+master) + #Check if the pds we just created exists in all the teams(+scheduler) id_check_bds = backend_mpi.parallelize([bds.bds_id]*5) bds_check_result = backend_mpi.map(check_if_exists, id_check_bds) self.assertTrue(False not in backend_mpi.collect(bds_check_result),"BDS was not created") - #Delete the PDS on master and try again + #Delete the PDS on scheduler and try again del bds bds_check_result = backend_mpi.map(check_if_exists,id_check_bds) self.assertTrue(True not in backend_mpi.collect(bds_check_result),"BDS was not deleted") diff --git a/tests/backend_tests_mpi_model_mpi.py b/tests/backend_tests_mpi_model_mpi.py new file mode 100644 index 00000000..e6b7b1aa --- /dev/null +++ b/tests/backend_tests_mpi_model_mpi.py @@ -0,0 +1,149 @@ +import unittest +from mpi4py import MPI +from abcpy.backends import BackendMPI,BackendMPITestHelper +import numpy + +def setUpModule(): + ''' + If an exception is raised in a setUpModule then none of + the tests in the module will be run. + + This is useful because the teams run in a while loop on initialization + only responding to the scheduler's commands and will never execute anything else. + + On termination of scheduler, the teams call quit() that raises a SystemExit(). + Because of the behaviour of setUpModule, it will not run any unit tests + for the team and we now only need to write unit-tests from the scheduler's + point of view. + ''' + global rank,backend_mpi + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + backend_mpi = BackendMPI(process_per_model=2) + +class MPIBackendTests(unittest.TestCase): + + def test_parallelize(self): + data = [0]*backend_mpi.size() + pds = backend_mpi.parallelize(data) + pds_map = backend_mpi.map(lambda x, npc=None: x + MPI.COMM_WORLD.Get_rank(), pds) + res = backend_mpi.collect(pds_map) + + for scheduler_index in backend_mpi.scheduler_node_ranks(): + self.assertTrue(scheduler_index not in res,"Node in scheduler_node_ranks performed map.") + + def test_map(self): + def square_mpi(x, npc=None): + local_res = numpy.array([2*(x**2)], 'i') + #global_res = numpy.array([0], 'i') + #MPI.COMM_WORLD.Reduce([local_res,MPI.INT], [global_res,MPI.INT], op=MPI.SUM, root=0) + return local_res[0] + + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + pds_map = backend_mpi.map(square_mpi, pds) + res = backend_mpi.collect(pds_map) + assert res==list(map(lambda x:2*(x**2),data)) + + + def test_broadcast(self): + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + + bds = backend_mpi.broadcast(100) + + #Pollute the BDS values of the scheduler to confirm teams + # use their broadcasted value + for k,v in backend_mpi.bds_store.items(): + backend_mpi.bds_store[k] = 99999 + + def test_map(x, npc=None): + return x + bds.value() + + pds_m = backend_mpi.map(test_map, pds) + self.assertTrue(backend_mpi.collect(pds_m)==[101,102,103,104,105]) + + def test_pds_delete(self): + + def check_if_exists(x, npc): + obj = BackendMPITestHelper() + if npc.communicator().Get_rank() == 0: + return obj.check_pds(x) + return None + + data = [1,2,3,4,5] + pds = backend_mpi.parallelize(data) + + #Check if the pds we just created exists in all the teams(+scheduler) + + id_check_pds = backend_mpi.parallelize([pds.pds_id]*5) + pds_check_result = backend_mpi.map(check_if_exists, id_check_pds) + self.assertTrue(False not in backend_mpi.collect(pds_check_result),"PDS was not created") + + #Delete the PDS on scheduler and try again + del pds + pds_check_result = backend_mpi.map(check_if_exists,id_check_pds) + + self.assertTrue(True not in backend_mpi.collect(pds_check_result),"PDS was not deleted") + + + def test_bds_delete(self): + + def check_if_exists(x, npc=None): + obj = BackendMPITestHelper() + return obj.check_bds(x) + + data = [1,2,3,4,5] + bds = backend_mpi.broadcast(data) + + #Check if the pds we just created exists in all the teams(+scheduler) + id_check_bds = backend_mpi.parallelize([bds.bds_id]*5) + bds_check_result = backend_mpi.map(check_if_exists, id_check_bds) + self.assertTrue(False not in backend_mpi.collect(bds_check_result),"BDS was not created") + + #Delete the PDS on scheduler and try again + del bds + bds_check_result = backend_mpi.map(check_if_exists,id_check_bds) + self.assertTrue(True not in backend_mpi.collect(bds_check_result),"BDS was not deleted") + + + def test_function_pickle(self): + + def square_mpi(x, npc=None): + local_res = numpy.array([2*(x**2)], 'i') + #global_res = numpy.array([0], 'i') + #model_comm.Reduce([local_res,MPI.INT], [global_res,MPI.INT], op=MPI.SUM, root=0) + return local_res[0] + + class staticfunctest_mpi: + @staticmethod + def square_mpi(x, npc=None): + local_res = numpy.array([2*(x**2)], 'i') + #global_res = numpy.array([0], 'i') + #model_comm.Reduce([local_res,MPI.INT], [global_res,MPI.INT], op=MPI.SUM, root=0) + return local_res[0] + + class nonstaticfunctest_mpi: + def square_mpi(self, x, npc=None): + local_res = numpy.array([2*(x**2)], 'i') + #global_res = numpy.array([0], 'i') + #model_comm.Reduce([local_res,MPI.INT], [global_res,MPI.INT], op=MPI.SUM, root=0) + return local_res[0] + + data = [1,2,3,4,5] + expected_result = [2,8,18,32,50] + + pds = backend_mpi.parallelize(data) + pds_map1 = backend_mpi.map(square_mpi,pds) + pds_res1 = backend_mpi.collect(pds_map1) + + self.assertTrue(pds_res1==expected_result,"Failed pickle test for general function") + + pds_map3 = backend_mpi.map(staticfunctest_mpi.square_mpi,pds) + pds_res3 = backend_mpi.collect(pds_map3) + self.assertTrue(pds_res3==expected_result,"Failed pickle test for static function") + + obj = nonstaticfunctest_mpi() + pds_map4 = backend_mpi.map(obj.square_mpi ,pds) + pds_res4 = backend_mpi.collect(pds_map4) + self.assertTrue(pds_res4==expected_result,"Failed pickle test for non-static function") diff --git a/tests/inferences_tests.py b/tests/inferences_tests.py index 4308324d..f49633d9 100644 --- a/tests/inferences_tests.py +++ b/tests/inferences_tests.py @@ -91,8 +91,8 @@ def test_sample(self): self.assertEqual(mu_sample_shape, (10,1)) self.assertEqual(sigma_sample_shape, (10,1)) self.assertEqual(weights_sample_shape, (10,1)) - self.assertLess(abs(mu_post_mean - (-3.402868)), 1e-3) - self.assertLess(abs(sigma_post_mean - 6.212), 1e-3) + self.assertLess(abs(mu_post_mean - (-3.56042761)), 1e-3) + self.assertLess(abs(sigma_post_mean - 5.7553691), 1e-3) self.assertFalse(journal.number_of_simulations == 0) @@ -111,8 +111,8 @@ def test_sample(self): self.assertEqual(mu_sample_shape, (10,1)) self.assertEqual(sigma_sample_shape, (10,1)) self.assertEqual(weights_sample_shape, (10,1)) - self.assertLess(abs(mu_post_mean - (-3.03325763) ), 1e-3) - self.assertLess(abs(sigma_post_mean - 6.92124735), 1e-3) + self.assertLess(abs(mu_post_mean - (-3.25971092) ), 1e-3) + self.assertLess(abs(sigma_post_mean - 7.76172201), 1e-3) self.assertFalse(journal.number_of_simulations == 0)