-
Notifications
You must be signed in to change notification settings - Fork 6
/
parallel-basics.Rmd
871 lines (623 loc) · 40.5 KB
/
parallel-basics.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
Basics of Parallel Processing in R, Python, Matlab, and C
==============================================================
Threaded linear algebra and parallel for loops in a shared memory (single machine) context
--------------
Chris Paciorek, Department of Statistics, UC Berkeley
```{r setup, include=FALSE}
library(knitr)
library(stringr)
read_chunk('parallel-basics.R')
read_chunk('parallel-basics.py')
read_chunk('parallel-basics.m')
```
# 0) This Tutorial
This tutorial covers basic strategies for using parallel processing in R, Python, Matlab, and C on single machine in which multiple processors (cores) share memory.
You should be able to replicate much of what is covered here provided you have R, Python, or Matlab installed on your computer, but some of the parallelization approaches may not work on Windows.
This tutorial was developed using a virtual machine developed here at Berkeley, the [Berkeley Common Environment (BCE)](http://bce.berkeley.edu). BCE is a virtual Linux machine - basically it is a Linux computer that you can run within your own computer, regardless of whether you are using Windows, Mac, or Linux. This provides a common environment so that things behave the same for all of us. You're welcome to try using BCE with this tutorial, but I should note that we haven't updated BCE recently and the VirtualBox software on which BCE will run can be a pain.
This tutorial assumes you have a working knowledge of either R, Python, Matlab, or C.
Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at https://github.com/berkeley-scf/tutorial-parallel-basics. You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:
```{r, clone, eval=FALSE}
git clone https://github.com/berkeley-scf/tutorial-parallel-basics
```
To create this HTML document, simply compile the corresponding R Markdown file in R as follows (the following will work from within BCE after cloning the repository as above).
```{r, build-html, eval=FALSE}
Rscript -e "library(knitr); knit2html('parallel-basics.Rmd')"
```
This tutorial by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.
# 1) Types of parallel processing
There are two basic flavors of parallel processing (leaving aside
GPUs): distributed memory and shared memory. With shared memory, multiple
processors (which I'll call cores) share the same memory. With distributed
memory, you have multiple nodes, each with their own memory. You can
think of each node as a separate computer connected by a fast network.
## 1.1) Some useful terminology:
- *cores*: We'll use this term to mean the different processing
units available on a single node.
- *nodes*: We'll use this term to mean the different computers,
each with their own distinct memory, that make up a cluster or supercomputer.
- *processes*: computational tasks executing on a machine; multiple
processes may be executing at once. A given program may start up multiple
processes at once. Ideally we have no more processes than cores on
a node.
- *threads*: multiple paths of execution within a single process;
the OS sees the threads as a single process, but one can think of
them as 'lightweight' processes. Ideally when considering the processes
and their threads, we would have no more processes and threads combined
than cores on a node.
- *forking*: child processes are spawned that are identical to
the parent, but with different process IDs and their own memory.
- *sockets*: some of R's parallel functionality involves creating
new R processes (e.g., starting processes via *Rscript*) and
communicating with them via a communication technology called sockets.
## 1.2) Shared memory
For shared memory parallelism, each core is accessing the same memory
so there is no need to pass information (in the form of messages)
between different machines. But in some programming contexts one needs
to be careful that activity on different cores doesn't mistakenly
overwrite places in memory that are used by other cores.
We'll cover two types of shared memory parallelism approaches:
- threaded linear algebra
- multicore functionality
### Threading
Threads are multiple paths of execution within a single process. If you are monitoring CPU
usage (such as with *top* in Linux or Mac) and watching a job that is executing threaded code, you'll
see the process using more than 100% of CPU. When this occurs, the
process is using multiple cores, although it appears as a single process
rather than as multiple processes.
Note that this is a different notion than a processor that is hyperthreaded. With hyperthreading a single core appears as two cores to the operating system.
## 1.3) Distributed memory
Parallel programming for distributed memory parallelism requires passing
messages between the different nodes. The standard protocol for doing
this is MPI, of which there are various versions, including *openMPI*.
The R package *Rmpi* implements MPI in R. The *pbd* packages for R also implement MPI as well as distributed linear algebra (linear algebra calculations across nodes). In addition, there are various ways to do simple parallelization of multiple computational tasks (across multiple nodes) that use MPI and other tools on the back-end without users needing to understand them.
Python has a package *mpi4py* that allows use of MPI within Python. And the IPython parallel tools allow one to do simple parallelization of multiple computational tasks across multiple nodes (as do other Python packages as well).
MATLAB has its own system for distributed computation, called the Distributed Computing Server (DCS), requiring additional licensing above the standard MATLAB installation.
This tutorial will not cover distributed memory parallelization; please see our [tutorial on parallel processing in a distributed context](https://github.com/berkeley-scf/tutorial-parallel-distributed).
## 1.4) Other type of parallel processing
We won't cover either of these in this material.
### GPUs
GPUs (Graphics Processing Units) are processing units originally designed
for rendering graphics on a computer quickly. This is done by having
a large number of simple processing units for massively parallel calculation.
The idea of general purpose GPU (GPGPU) computing is to exploit this
capability for general computation. In spring 2016, I gave a [workshop on using GPUs](http://statistics.berkeley.edu/computing/gpu).
Most researchers don't program for a GPU directly but rather use software (often machine learning software such as Tensorflow or Caffe) that has been programmed to take advantage of a GPU if one is available.
### Spark and Hadoop
Spark and Hadoop are systems for implementing computations in a distributed
memory environment, using the MapReduce approach.
# 2) Threading, particularly for linear algebra
# 2.1) What is the BLAS?
The BLAS is the library of basic linear algebra operations (written in
Fortran or C). A fast BLAS can greatly speed up linear algebra
relative to the default BLAS on a machine. Some fast BLAS libraries
are
- Intel's *MKL*; may be available for educational use for free
- *OpenBLAS* (formerly *GotoBLAS*); open source and free
- AMD's *ACML*; free
- *vecLib* for Macs; provided with your Mac
In addition to being fast when used on a single core, all of these BLAS libraries are
threaded - if your computer has multiple cores and there are free
resources, your linear algebra will use multiple cores, provided your
program is linked against the threaded BLAS installed on your machine and provided
OMP_NUM_THREADS is not set to one. (Macs make use of
VECLIB_MAXIMUM_THREADS rather than OMP_NUM_THREADS.)
On BCE, both R and (to some degree) Python are linked against OpenBLAS as of BCE-fall-2015.
## 2.2) Using threading
### 2.2.1) R
Threading in R is limited to linear algebra, provided R is linked against a threaded BLAS.
Here's some code that illustrates
the speed of using a threaded BLAS:
```{r, R-linalg, eval=FALSE}
```
Here the elapsed time indicates that using four threads gave us a two times (2x) speedup in terms of real time, while the user time indicates that the threaded calculation took a bit more total processing time (combining time across all processors) because of the overhead of using multiple threads.
Note that the code also illustrates use of an R package that can control the number of threads from within R.
### 2.2.2) Python
As with R, threading in Python is limited to linear algebra, provided Python is linked against a threaded BLAS. Python has something
called the [Global Interpreter Lock](https://wiki.python.org/moin/GlobalInterpreterLock)
that interferes with threading in Python (but not in threaded linear algebra packages called by Python).
Here's some linear algebra in Python that will use threading if *numpy* is linked against a threaded BLAS, though I don't compare the timing for different numbers of threads here.
```{r, py-linalg, engine='python', eval=FALSE}
```
### 2.2.3) MATLAB
Many MATLAB functions are automatically threaded (not just linear
algebra), so you don't need to do anything special in your code to
take advantage of this.
So if you're running MATLAB and monitoring CPU usage (e.g., using
*top* on Linux or OS X), you may see a process using more than 100% of CPU. However
worker tasks within a *parfor* (see Section 3) use only a single thread.
MATLAB uses MKL for linear algebra.
## 2.3) Fixing the number of threads (cores used)
In general, threaded code will
detect the number of cores available on a machine and make use of
them. However, you can also explicitly control the number of threads
available to a process.
### 2.3.1) R, Python, C, etc.
For most threaded code (that based on the openMP protocol), the number
of threads can be set by setting the OMP_NUM_THREADS environment
variable (VECLIB_MAXIMUM_THREADS on a Mac). E.g., to set it for four
threads in bash:
```export OMP_NUM_THREADS=4```
Do this before starting your R or Python session or before running your compiled executable.
Alternatively, you can set OMP_NUM_THREADS as you invoke your job, e.g., here with R:
```OMP_NUM_THREADS=4 R CMD BATCH --no-save job.R job.out```
### 2.3.2) MATLAB
MATLAB is an exception. Threading in MATLAB can be controlled
in two ways. From within your MATLAB code you can set the number of
threads, e.g., to four in this case:
```feature('numThreads', 4)```
To use only a single thread, you can use 1 instead of 4 above, or
you can start MATLAB with the *singleCompThread* flag:
```matlab -singleCompThread ...```
## 2.4) Important warnings about use of threaded BLAS
### 2.4.1) Speed and threaded BLAS
In many cases, using multiple threads for linear algebra operations
will outperform using a single thread, but there is no guarantee that
this will be the case, in particular for operations with small matrices
and vectors. Testing with openBLAS suggests that sometimes a job may
take more time when using multiple threads; this seems to be less
likely with ACML. This presumably occurs because openBLAS is not doing
a good job in detecting when the overhead of threading outweights
the gains from distributing the computations. You can compare speeds
by setting OMP_NUM_THREADS to different values. In cases where threaded
linear algebra is slower than unthreaded, you would want to set OMP_NUM_THREADS
to 1.
More generally, if you are using the parallel tools in Section 3 to
simultaneously carry out many independent calculations (tasks), it is
likely to be more effective to use the fixed number of cores available on your machine
so as to split up the tasks, one per core, without taking advantage of the threaded BLAS (i.e., restricting
each process to a single thread).
### 2.4.2) Conflict between openBLAS and some parallel functionality in R
There are conflicts between forking in R and threaded BLAS that in
some cases have affected:
- *foreach* (when using the *parallel* (and *multicore*) backends),
- *mclapply*, and
- *parLapply* and *parSapply* (only if *cluster* is set up with forking -- not the default).
The result is that if linear algebra is used within your parallel
code, R hangs. This has affected both openBLAS and ACML in the past,
though it may not affect current versions of these software.
If you find your R session hanging, before running an R job that does linear algebra,
you can set OMP_NUM_THREADS to 1 to prevent the BLAS from doing
threaded calculations. Alternatively, you can use MPI as the parallel
backend (via *doMPI* in place of *doParallel*).
You may also be able to convert your code to use *par{L,S}apply*
with the default PSOCK type and avoid *foreach* entirely.
### 2.4.3) Conflict between threaded BLAS and R profiling
There is also a conflict between threaded BLAS and R profiling, so
if you are using *Rprof*, you may need to set OMP_NUM_THREADS
to one. This has definitely occurred with openBLAS; I'm not sure about
other threaded BLAS libraries.
**Caution**: Note that I don't pay any attention to possible
danger in generating random numbers in separate processes in this Section. More on
this issue in Section 4.
## 2.5) Using an optimized BLAS on your own machine(s)
To use an optimized BLAS with R, talk to your systems administrator, see [Section A.3 of the R Installation and Administration Manual](https://cran.r-project.org/manuals.html), or see [these instructions to use *vecLib* BLAS from Apple's Accelerate framework on your own Mac](http://statistics.berkeley.edu/computing/blas).
It's also possible to use an optimized BLAS with Python's numpy and scipy packages, on either Linux or using the Mac's *vecLib* BLAS. Details will depend on how you install Python, numpy, and scipy.
# 3) Basic parallelized loops/maps/apply
All of the functionality discussed here applies *only* if the iterations/loops of your calculations can be done completely separately and do not depend on one another. This scenario is called an *embarrassingly parallel* computation. So coding up the evolution of a time series or a Markov chain is not possible using these tools. However, bootstrapping, random forests, simulation studies, cross-validation
and many other statistical methods can be handled in this way.
## 3.1) Parallel loops and *apply* functions in R
### 3.1.1) Parallel for loops with *foreach*
A simple way to exploit parallelism in R is to use the *foreach* package to do a for loop in parallel.
The *foreach* package provides a *foreach* command that
allows you to do this easily. *foreach* can use a variety of
parallel ``back-ends''. For our purposes, the main one is use of the *parallel* package to use shared
memory cores. When using *parallel* as the
back-end, you should see multiple processes (as many as you registered;
ideally each at 100%) when you monitor CPU usage. The multiple processes
are created by forking or using sockets. *foreach* can also use *Rmpi* or *SNOW* to access cores in
a distributed memory setting; please see the tutorial on distributed parallel processing mentioned above.
Here we'll parallelize leave-one-out cross-validation for a random forest model. An iteration involves holding out a data point, fitting the model with all the other data points, and then predicting the held-out point.
```{r, foreach}
```
(Note that the printed statements from `cat` are not showing up in the creation of this document but should show if you run the code.)
Note that *foreach*
also provides functionality for collecting and managing
the results to avoid some of the bookkeeping
you would need to do if writing your own standard for loop.
The result of *foreach* will generally be a list, unless
we request the results be combined in different way, as we do here using `.combine = c`.
You can debug by running serially using *%do%* rather than
*%dopar%*. Note that you may need to load packages within the
*foreach* construct to ensure a package is available to all of
the calculations.
It is possible to use foreach to parallelize over nested loops. Suppose that the outer loop has too few tasks to effectively parallelize over and you also want to parallelize over the inner loop as well. Provided the calculations in each task (defined based on the pair of indexes from both loops) are independent of the other tasks, you can define two foreach loops, with the outer foreach using the `%:%` operator and the inner foreach using the usual `%dopar%` operator. More details can be found [in this vignette](https://cran.r-project.org/web/packages/foreach/vignettes/nested.pdf).
### 3.1.2) Parallel apply functionality
The *parallel* package has the ability to parallelize the various
*apply* functions (apply, lapply, sapply, etc.). It's a bit hard to find the [vignette for the parallel package](http://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf)
because parallel is not listed as one of
the contributed packages on CRAN (it gets installed with R by default).
We'll consider parallel lapply and sapply. These rely on having started a cluster using *cluster*, which uses the PSOCK mechanism as in the SNOW package - starting new jobs via *Rscript*
and communicating via a technology called sockets.
```{r, parallel_lsApply, eval=TRUE}
```
Here the miniscule user time is probably because the time spent in the worker processes is not counted at the level of the overall master process that dispatches the workers.
For help with these functions and additional related parallelization functions (including *parApply*), see `help(clusterApply)`.
*mclapply* is an alternative that uses forking to start up the worker processes.
```{r, mclapply, eval=TRUE}
```
Note that some R packages can directly interact with the parallelization
packages to work with multiple cores. E.g., the *boot* package
can make use of the *parallel* package directly.
### 3.1.3) Loading packages and accessing global variables within your parallel tasks
Whether you need to explicitly load packages and export global variables from the master process to the parallelized worker processes depends on the details of how you are doing the parallelization.
With *foreach* with the *doParallel* backend, parallel *apply* statements (starting the cluster via *makeForkCluster*, instead of the usual *makeCluster*), and *mclapply*, packages and global variables in the main R process are automatically available to the worker tasks without any work on your part. This is because all of these approaches fork the original R process, thereby creating worker processes with the same state as the original R process. Interestingly, this means that global variables in the forked worker processes are just references to the objects in memory in the original R process. So the additional processes do not use additional memory for those objects (despite what is shown in *top*) and there is no time involved in making copies. However, if you modify objects in the worker processes then copies are made.
In contrast, with parallel *apply* statements when starting the cluster using the standard *makeCluster* (which sets up a so-called *PSOCK* cluster, starting the R worker processes via *Rscript*), one needs to load packages within the code that is executed in parallel. In addition one needs to use *clusterExport* to tell R which objects in the global environment should be available to the worker processes. This involves making as many copies of the objects as there are worker processes, so one can easily exceed the physical memory (RAM) on the machine if one has large objects, and the copying of large objects will take time.
## 3.2) Parallel looping in Python
I'll cover IPython parallel functionality, which allows one to parallelize on a single machine (discussed here) or across multiple machines (see the tutorial on distributed memory parallelization). There are a variety of other approaches one could use, of which I discuss two (the *pp* and *multiprocessing* packages) in the Appendix.
First we need to start our worker engines.
```{r, ipython-parallel-setup, engine='bash', eval=FALSE}
ipcluster start -n 4 &
sleep 45
```
Here we'll do the same random forest cross-validation as before.
```{r, ipython-parallel, engine='python', eval=FALSE}
```
Finally we stop the worker engines:
```{r, ipython-parallel-shutdown, engine='bash', eval=FALSE}
ipcluster stop
```
## 3.3) Basic shared memory parallel programming in MATLAB
To run a loop in parallel in MATLAB, you can use the *parfor*
construction. Before running the parfor you
need to start up a set of workers using *parpool*. MATLAB will use only one thread
per worker.
Here is some demo code:
```{r, matlab-parfor, engine='bash', eval=FALSE}
```
MATLAB has a default limit on the number of workers in a pool, but you can modify your MATLAB settings as follows to increase that limit (in this case to allow up to 32 workers). It should work to run the following code once in a MATLAB session, which will modify the settings for future MATLAB sessions.
```
cl = parcluster('local');
cl.NumWorkers = 32;
saveProfile(cl);
```
By default MATLAB uses a single thread (core) per worker. However you can also use multiple threads. Here is how you can set that up.
```
cl = parcluster('local');
cl.NumThreads = 2; % 2 threads per worker
cl.parpool(4); % 4 workers
```
## 3.4) Using *OpenMP* threads for basic shared memory programming in C
It's straightforward to write threaded code in C and C++ (as well as Fortran) to exploit multiple cores. The basic approach is to use the OpenMP protocol. Here's how one would parallelize a loop in C/C++ using an OpenMP compiler directive. In this case we are parallelizing the outer loop; the iterations of the outer loop are done in parallel, while the iterations of the inner loop are done serially within a thread. As with *foreach* in R, you only want to do this if the iterations do not depend on each other. The code is available as a C++ program (but the core of the code is just C code) in *testOpenMP.cpp*.
```
// see testOpenMP.cpp
#include <iostream>
using namespace std;
// compile with: g++ -fopenmp -L/usr/local/lib
// testOpenMP.cpp -o testOpenMP
int main(){
int nReps = 20;
double x[nReps];
#pragma omp parallel for
for (int i=0; i<nReps; i++){
x[i] = 0.0;
for ( int j=0; j<1000000000; j++){
x[i] = x[i] + 1.0;
}
cout << x[i] << endl;
}
return 0;
}
```
We would compile this program as follows
```
$ g++ -fopenmp testOpenMP.cpp -o testOpenMP
```
The main thing to be aware of in using OpenMP is not having different threads overwrite variables used by other threads. In the example above, variables declared within the `#pragma` directive will be recognized as variables that are private to each thread. In fact, you could declare `int i` before the compiler directive and things would be fine because OpenMP is smart enough to deal properly with the primary looping variable. But big problems would ensue if you had instead written the following code:
```
int main(){
int nReps = 20;
int j; // DON'T DO THIS !!!!!!!!!!!!!
double x[nReps];
#pragma omp parallel for
for (int i=0; i<nReps; i++){
x[i] = 0.0;
for (j=0; j<1000000000; j++){
x[i] = x[i] + 1.0;
}
cout << x[i] << endl;
}
return 0;
}
```
Note that we do want *x* declared before the compiler directive because we want all the threads to write to a common *x* (but, importantly, to different components of *x*). That's the point!
We can also be explicit about what is shared and what is private to each thread:
```
int main(){
int nReps = 20;
int i, j;
double x[nReps];
#pragma omp parallel for private(i,j) shared(x, nReps)
for (i=0; i<nReps; i++){
x[i] = 0.0;
for (j=0; j<1000000000; j++){
x[i] = x[i] + 1.0;
}
cout << x[i] << endl;
}
return 0;
}
```
## 3.5) Calling OpenMP-based C code from R
The easiest path here is to use the *Rcpp* package. In this case, you can write your C++ code with OpenMP pragma statemetns as in the previous subsection. You'll need to make sure that the *PKG_CXXFLAGS* and *PKG_LIBS* environment variables are set to include `-f openmp` so the compilation is done correctly. More details/examples linked to from [this Stack overflow post](http://stackoverflow.com/questions/22748358/rcpp-with-openmp).
# 4) Parallelization strategies
The following are some basic principles/suggestions for how to parallelize
your computation.
Should I use one machine/node or many machines/nodes?
- If you can do your computation on the cores of a single node using
shared memory, that will be faster than using the same number of cores
(or even somewhat more cores) across multiple nodes. Similarly, jobs
with a lot of data/high memory requirements that one might think of
as requiring Spark or Hadoop may in some cases be much faster if you can find
a single machine with a lot of memory.
- That said, if you would run out of memory on a single node, then you'll
need to use distributed memory.
What level or dimension should I parallelize over?
- If you have nested loops, you generally only want to parallelize at
one level of the code. That said, there may be cases in which it is
helpful to do both. Keep in mind whether your linear algebra is being
threaded. Often you will want to parallelize over a loop and not use
threaded linear algebra.
- Often it makes sense to parallelize the outer loop when you have nested
loops.
- You generally want to parallelize in such a way that your code is
load-balanced and does not involve too much communication.
How do I balance communication overhead with keeping my cores busy?
- If you have very few tasks, particularly if the tasks take different
amounts of time, often some processors will be idle and your code
poorly load-balanced.
- If you have very many tasks and each one takes little time, the communication
overhead of starting and stopping the tasks will reduce efficiency.
Should multiple tasks be pre-assigned to a process (i.e., a worker) (sometimes called *prescheduling*) or should tasks
be assigned dynamically as previous tasks finish?
- Basically if you have many tasks that each take similar time, you
want to preschedule the tasks to reduce communication. If you have few tasks
or tasks with highly variable completion times, you don't want to
preschedule, to improve load-balancing.
- For R in particular, some of R's parallel functions allow you to say whether the
tasks should be prescheduled. E.g., the *mc.preschedule* argument in *mclapply* and (in principle) the *parLapplyLB* function in place of *parLapply*. However, there appears to be a bug in *parLapplyLB* such that it doesn't actually act in a load-balanced way.
# 5) Random number generation (RNG) in parallel
The key thing when thinking about random numbers in a parallel context
is that you want to avoid having the same 'random' numbers occur on
multiple processes. On a computer, random numbers are not actually
random but are generated as a sequence of pseudo-random numbers designed
to mimic true random numbers. The sequence is finite (but very long)
and eventually repeats itself. When one sets a seed, one is choosing
a position in that sequence to start from. Subsequent random numbers
are based on that subsequence. All random numbers can be generated
from one or more random uniform numbers, so we can just think about
a sequence of values between 0 and 1.
The worst thing that could happen is that one sets things up in such
a way that every process is using the same sequence of random numbers.
This could happen if you mistakenly set the same seed in each process,
e.g., using *set.seed(mySeed)* in R on every process.
The naive approach is to use a different seed for each process. E.g.,
if your processes are numbered `id = 1,2,...,p` with a variable *id* that is unique
to a process, setting the seed to be the value of *id* on each process. This is likely
not to cause problems, but raises the danger that two (or more sequences)
might overlap. For an algorithm with dependence on the full sequence,
such as an MCMC, this probably won't cause big problems (though you
likely wouldn't know if it did), but for something like simple simulation
studies, some of your 'independent' samples could be exact replicates
of a sample on another process. Given the period length of the default
generators in R, MATLAB and Python, this is actually quite unlikely,
but it is a bit sloppy.
One approach to avoid the problem is to do all your RNG on one process
and distribute the random deviates, but this can be infeasible with
many random numbers.
More generally to avoid this problem, the key is to use an algorithm
that ensures sequences that do not overlap.
## 5.1) Ensuring separate sequences in R
In R, the *rlecuyer* package deals with this.
The L'Ecuyer algorithm has a period of $2^{191}$, which it divides
into subsequences of length $2^{127}$.
### 5.1.1) With the parallel package
Here's how you initialize independent sequences on different processes
when using the *parallel* package's parallel apply functionality
(illustrated here with *parSapply*).
```{r, RNG-apply, eval=TRUE}
```
If you want to explicitly move from stream to stream, you can use
*nextRNGStream*. For example:
```{r, RNGstream, eval=FALSE}
```
When using *mclapply*, you can use the *mc.set.seed* argument
as follows (note that *mc.set.seed* is TRUE by default, so you
should get different seeds for the different processes by default),
but one needs to invoke `RNGkind("L'Ecuyer-CMRG")`
to get independent streams via the L'Ecuyer algorithm.
```{r, RNG-mclapply, eval=TRUE}
```
The documentation for *mcparallel* gives more information about
reproducibility based on *mc.set.seed*.
### 5.1.2) With foreach
#### Getting independent streams
One question is whether *foreach* deals with RNG correctly. This
is not documented, but the developers (Revolution Analytics) are well
aware of RNG issues. Digging into the underlying code reveals that
the *doParallel* backend invokes *mclapply*
and sets *mc.set.seed* to TRUE by default. This suggests that
the discussion above r.e. *mclapply* holds for *foreach*
as well, so you should do `RNGkind("L'Ecuyer-CMRG")`
before your foreach call.
#### Ensuring reproducibility
While using *foreach* as just described should ensure that the
streams on each worker are are distinct, it does not ensure reproducibility
because task chunks may be assigned to workers differently in different
runs and the substreams are specific to workers, not to tasks.
For backends other than *doMPI*, such as *doParallel*, there is a package
called *doRNG* that ensures that *foreach* loops are reproducible. (For *doMPI* you simply pass `.options.mpi = list(seed = your_seed_value_here)` as an additional argument to *foreach*.)
Here's how you do it:
```{r, RNG-doRNG}
```
You can ignore the warnings about closing unused connections printed out above.
## 5.2) RNG in Python
Python uses the Mersenne-Twister generator. If you're using the RNG
in *numpy/scipy*, you can set the seed using `numpy.random.seed` or `scipy.random.seed`.
The advice I'm seeing online in various Python forums is to just set
separate seeds, so it appears the Python is a bit behind R and MATLAB here.
There is a function *random.jumpahead* that
allows you to move the seed ahead as if a given number of random numbers
had been generated, but this function will not be in Python 3.x, so
I won't suggest using it.
## 5.3) RNG in MATLAB
MATLAB also uses the Mersenne-Twister. We can set the seed as: `rng(seed)`,
with seed being a non-negative integer.
Happily, like R, we can set up independent streams, using either of
the Combined Multiple Recursive ('mrg32k3a') and the Multiplicative
Lagged Fibonacci ('mlfg6331_64') generators. Here's an example, where
we create the second of the 5 streams, as if we were using this code
in the second of our parallel processes. The `'Seed',0` part
is not actually needed as that is the default.
```
thisStream = 2;
totalNumStreams = 5;
seed = 0;
cmrg1 = RandStream.create('mrg32k3a', 'NumStreams', totalNumStreams,
'StreamIndices', thisStream, 'Seed', seed);
RandStream.setGlobalStream(cmrg1);
randn(5, 1)
```
# Appendix: Other shared memory parallelization functionality in R and MATLAB
## Other parallel functionality in R
One can use *mcparallel* in the *parallel* package to
send different chunks of code to different processes. Here we would
need to manage the number of tasks so that we don't have more tasks
than available cores.
```{r, mcparallel, eval=TRUE}
```
Note that *mcparallel* also allows the use of the *mc.set.seed*
argument as with *mclapply*.
Note that on the cluster, one should create only as many parallel
blocks of code as were requested when submitting the job.
Now let's consider parallel evaluation of a vectorized function. This
will often only be worthwhile on very long vectors and for computationally
intensive calculations. (The *Matern* call here is more time-consuming
than *exp*).
```{r, pvec, eval=TRUE}
```
## Manually parallelizing individual tasks in MATLAB
In addition to using *parfor* in MATLAB, you can also explicitly program parallelization, managing the individual
parallelized tasks. Here is some template code for doing this. We'll
submit our jobs to a pool of workers so that we have control over
how many jobs are running at once. Note that here I submit 6 jobs
that call the same function, but the different jobs could call different
functions and have varying inputs and outputs. MATLAB will run as
many jobs as available workers in the pool and will queue the remainder,
starting them as workers in the pool become available. Here is
some demo code
```{r, matlab-batch, engine='bash', eval=FALSE}
```
And if you want to run threaded code in a given job, you can do that
by setting the number of threads within the function called by *parfeval*.
See the *testThread.m* file for the *testThread*
function.
```{r, matlab-batch-thread, engine='bash', eval=FALSE}
```
## Other approaches to parallelization in Python
### *pp* package
Here we create a server object and submit jobs to the server object,
which manages the farming out of the tasks. Note that this will run
interactively in IPython or as a script from UNIX, but will not run
interactively in the base Python interpreter (for reasons that are
unclear to me). Also note that while we are illustrating this as basically
another parallelized for loop, the individual jobs can be whatever
calculations you want, so the *taskFun* function could change from
job to job.
```{r, python-pp, engine='python', eval=FALSE}
```
```
[(0, 0.00030280243091597474), (1, -8.5722825540149767e-05), (2, 0.00013566614947237407), (3, 0.00061310818505479474), (4, -0.0004702706491795272), (5, 0.00024515486966970537), (6, -0.00017472300458822845), (7, -0.00025050095623507584), (8, -0.00033399772183492841), (9, -0.00049137138871004158), (10, 0.00029251318047107422), (11, 1.1956375483643322e-05), (12, -0.00010810414999124078), (13, 0.00015533121727716678), (14, -0.00092143784872822018), (15, -7.4020047531168942e-05), (16, -0.00027179897723462343), (17, -0.00020500359099047446), (18, 5.0102720605584639e-05), (19, -0.00031948846032527046), (20, -5.4961570167677311e-05), (21, -0.00057477384497516828), (22, 0.00035571929916218195), (23, 0.0003172760600221845), (24, -3.9757431343687736e-05), (25, 0.00037903275195759294), (26, -0.00010435497860874407), (27, 0.0001701626336006962), (28, -0.00069358450543517865), (29, 0.00067886194920371693), (30, -0.00051310981441539557), (31, -3.0022955955111069e-05), (32, -0.00063590672702952002), (33, -0.00031966078315322541), (34, -0.00015649509164027703), (35, 0.00028376009875884391), (36, 0.00018534703816611961), (37, -0.00021821998172858934), (38, 8.0842394421238762e-05), (39, -0.00014637870897851111)]
```
### *multiprocessing* package
Here we'll use the *Pool.map* method
to iterate in a parallelized fashion, as the Python analog to *foreach*
or *parfor*. *Pool.map* only supports having a single
argument to the function being used, so we'll use list of tuples,
and pass each tuple as the argument.
```{r, python-mp, engine='python', eval=TRUE}
```
## More advanced use of *OpenMP* in C
The goal here is just to give you a sense of what is possible with OpenMP.
The OpenMP API provides three components: compiler directives that parallelize your code (such as `#pragma omp parallel for`), library functions (such as `omp_get_thread_num()`), and environment variables (such as `OMP_NUM_THREADS`)
OpenMP constructs apply to structured blocks of code. Blocks may be executed in parallel or sequentially, depending on how one uses the OpenMP pragma statements. One can also force execution of a block to wait until particular preceding blocks have finished, using a *barrier*.
Here's a basic "Hello, world" example that illustrates how it works (the full program is in *helloWorldOpenMP.cpp*):
```
// see helloWorldOpenMP.cpp
#include <stdio.h>
#include <omp.h> // needed when using any openMP functions
// like omp_get_thread_num()
void myFun(double *in, int id){
// this is the function that would presumably do the heavy computational stuff
}
int main()
{
int nthreads, myID;
double* input;
/* make the values of nthreads and myid private to each thread */
#pragma omp parallel private (nthreads, myID)
{ // beginning of block
myID = omp_get_thread_num();
printf("Hello, I am thread %d\n", myID);
myFun(input, myID); // do some computation on each thread
/* only master node print the number of threads */
if (myid == 0)
{
nthreads = omp_get_num_threads();
printf("I'm the boss and control %i threads. How come they're in front of me?\n", nThreads);
}
} // end of block
return 0;
}
```
The *parallel* directive starts a team of threads, including the master, which is a member of the team and has thread number 0. The number of threads is determined in the following ways - here the first two options specify four threads:
1. #pragma omp parallel NUM_THREADS (4) // set 4 threads for this parallel block
2. omp_set_num_threads(4) // set four threads in general
3. the value of the OMP_NUM_THREADS environment variable
4. a default - usually the number of cores on the compute node
Note that in `#pragma omp parallel for`, there are actually two instructions, `parallel` starts a team of threads, and `for` farms out the iterations to the team. In our parallel for invocation, we could have done it more explicitly as:
```
#pragma omp parallel
#pragma omp for
```
We can also explicitly distribute different chunks of code amongst different threads as seen here and in the full program in *sectionsOpenMP.cpp*.
```
// see sectionsOpenMP.cpp
#pragma omp parallel // starts a new team of threads
{
Work0(); // this function would be run by all threads.
#pragma omp sections // divides the team into sections
{
// everything herein is run only once.
#pragma omp section
{ Work1(); }
#pragma omp section
{
Work2();
Work3();
}
#pragma omp section
{ Work4(); }
}
} // implied barrier
```
Here Work1, {Work2 + Work3} and Work4 are done in parallel, but Work2 and Work3 are done in sequence (on a single thread).
If one wants to make sure that all of a parallized calculation is complete before any further code is executed you can insert `#pragma omp barrier`.
Note that a `#pragma for` statement includes an implicit barrier as does the end of any block specified with `#pragma omp parallel`.
You can use `nowait` if you explicitly want to prevent threads from waiting at an implicit barrier: e.g., `#pragma omp parallel sections nowait` or `#pragma omp parallel for nowait`
One should be careful about multiple threads writing to the same variable at the same time (this is an example of a race condition). In the example below, if one doesn't have the `#pragma omp critical` directive two threads could read the current value of *result* at the same time and then sequentially write to *result* after incrementing their local copy, which would result in one of the increments being lost. A way to avoid this is with the *critical* directive (for single lines of code you can also use `atomic` instead of `critical`), as seen here and in the full program in *criticalOpenMP.cpp*:
```
// see criticalOpenMP.cpp
double result = 0.0;
double tmp;
#pragma omp parallel for private (tmp, i) shared (result)
for (int i=0; i<n; i++){
tmp = myFun(i);
#pragma omp critical
result += tmp;
}
```
You should also be able to use syntax like the following for the parallel for declaration (in which case you shouldn't need the `#pragma omp critical`):
```
#pragma omp parallel for reduction(+:result)
```
I believe that doing this sort of calculation where multiple threads write to the same variable may be rather inefficient given time lost in waiting to have access to result, but presumably this would depend on how much time is spent in *myFun()* relative to the reduction operation.