Skip to content

Commit ce49a4e

Browse files
authored
Feat: Improve doc (#2)
1 parent b1145ac commit ce49a4e

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

AUTHORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ Adrien Berchet (@adrien-berchet)
55
# Contributors
66

77
* Alexis Arnaudon (@arnaudon)
8+
* Gianluca Ficarelli (@GianlucaFicarelli)

README.md

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Provides an embarrassingly parallel tool with sql backend.
44

55
## Introduction
66

7-
Provides an embarrassingly parallel tool with sql backend, inspired by [BluePyMM](https://github.com/BlueBrain/BluePyMM).
7+
Provides an embarrassingly parallel tool with sql backend, inspired by [BluePyMM](https://github.com/BlueBrain/BluePyMM) of @wvangeit.
88

99

1010
## Installation
@@ -41,11 +41,34 @@ mapper = parallel_factory.get_mapper()
4141
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))
4242
```
4343

44-
45-
### Working with Pandas and SQL backend
44+
### Working with Pandas
4645

4746
This library provides a specific function working with large :class:`pandas.DataFrame`: :func:`bluepyparallel.evaluator.evaluate`.
4847
This function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results.
48+
49+
Example:
50+
51+
```python
52+
input_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])
53+
54+
def evaluation_function(row):
55+
result_1, result_2 = compute_something(row['data'])
56+
return {'new_column_1': result_1, 'new_columns_2': result_2}
57+
58+
# Use the mapper to map the given function to each element of the DataFrame
59+
result_df = evaluate(
60+
input_df, # This is the DataFrame to process
61+
evaluation_function, # This is the function that should be applied to each row of the DataFrame
62+
parallel_factory="multiprocessing", # This could also be a Factory previously defined
63+
new_columns=[['new_column_1', 0], ['new_columns_2', None]], # this defines default values for columns
64+
)
65+
assert result_df.columns == ['data', 'new_columns_1', 'new_columns_2']
66+
```
67+
It is in a way a generalisation of the pandas `.apply` method.
68+
69+
70+
### Working with an SQL backend
71+
4972
As it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend.
5073
The SQL backend uses the [SQLAlchemy](https://docs.sqlalchemy.org) library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...).
5174
To activate this feature, just pass a [URL that can be processed by SQLAlchemy](https://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=url#database-urls) to the ``db_url`` parameter of :func:`bluepyparallel.evaluator.evaluate`.
@@ -69,10 +92,11 @@ If the crash was due to an external cause (therefore executing the code again sh
6992
computation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.
7093

7194

72-
## Running using Dask
95+
## Running with distributed Dask MPI on HPC systems
7396

74-
This is an example of a [sbatch](https://slurm.schedmd.com/sbatch.html) script that can be adapted to execute the script using multiple nodes and workers.
75-
In this example, the code called by the ``<command>`` should parallelized using BluePyParallel.
97+
This is an example of a [sbatch](https://slurm.schedmd.com/sbatch.html) script that can be
98+
adapted to execute the script using multiple nodes and workers with distributed dask and MPI.
99+
In this example, the code called by the ``run.py`` should be parallelized using BluePyParallel.
76100

77101
Dask variables are not strictly required, but highly recommended, and they can be fine tuned.
78102

@@ -96,9 +120,24 @@ export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms # Time between starti
96120
# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
97121
export PARALLEL_BATCH_SIZE=1000
98122

99-
srun -v <command>
123+
srun -v run.py
100124
```
101125

126+
To ensure only the `evaluate` function is run with parallel dask, one has to initialise the parallel factory
127+
before anything else is done in the code. For example, ``run.py`` could look like:
128+
129+
```python
130+
if __name__ == "__main__":
131+
parallel_factory = init_parallel_factory('dask_dataframe')
132+
df = pd.read_csv("inuput_data.csv")
133+
df = some_preprocessing(df)
134+
df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory)
135+
df.to_csv("output_data.csv")
136+
```
137+
138+
This is because everything before `init_parallel_factory` will be run in parallel, as mpi is not initialized yet.
139+
140+
.. note:: We recommend to use `dask_dataframe` instead of `dask`, as it is in practice more stable for large computations.
102141

103142
## Funding & Acknowledgment
104143

0 commit comments

Comments
 (0)