There are multiple ways of parallelizing C/C++ and Python codes to be used on clusters. The quick and dirty way, which often gets the job done, is to parallelize the code with MPI and launch one process per core across multiple nodes. Though easy to implement, this strategy may result in a fair waste of results because:
- With too many processes on one node, inefficient use of memory may make the code memory bottle-necked, meaning that a lot of time will be spent accessing memory and the cores will be sitting twiddling their thumbs.
- If you have a 48-core node with 192 GB of memory (as the Skylake nodes on Stampede 2) and your code uses 10 GB of RAM per process, you will not be able to have more than 19 out of 48 cores working at the same time because more than that would exceed the node’s memory, which would crash the node, which would get you kicked out of the cluster for a while without a warning — if you do not know how much RAM your code requires, you should look into the Remora tool.
If you are working on big runs on systems that charge per usage time or if you are creating a code you will have to run multiple times, it may be worth looking into more clever parallelization strategies. Some of the techniques described in the next subsections will also make your code two to four times faster on your desktop/laptop and are rather straight-forward to implement. Most of the code in this blog post can be freely downloaded from this repository. Lastly, if the #include
statements in the examples below are not displayed as followed by a library between “<>”, please refer to the mentioned online repository.
Hybrid Parallelization in C/C++
The word hybrid refers in this context to a mixed use of shared and distributed memory parallelization techniques. In distributed parallelization techniques, such as the Message Passing Interface (MPI), each process has its own dedicated chunk of memory within a RAM chip. This means that each process will have its own copy of the input data (repeated across processes), variables within the program, etc. Conversely, shared memory parallelization techniques such as OpenMP allow all threads (parallel unit of computation) to access the same block of memory, which can make a mess in your code with variables being changed seemingly out of nowhere by various parallel runs, but which lessens the amount of RAM needed and in some cases allow for more efficient memory utilization.
If running the Borg Master-Slave multiobjective evolutionary optimization algorithm (Borg MS) with hybrid parallelization, Borg MS will have each function evaluation performed by one MPI process, which could run in parallel with OpenMP (if your model is setup for that) across the cores designated for that process. As soon as a function evaluation is complete, the slave MPI process in charge of it will submit the results back to Borg MS’s master process and be given another set of decision variables to work on. The figure below is a schematic drawing of such hybrid parallelization scheme:
OpenMP
Parallelizing a for-loop in C or C++ in which each iteration is independent — such as in Monte Carlo simulations — is straight-forward:
#include #include "path/to/my_model.h" int main(int argc, char *argv[]) { int n_monte_carlo_simulations = omp_get_num_threads(); // One model run per core. vector system_inputs = ; vector results(n_monte_carlo_simulations, 0); #pragma omp parallel for for (int i = 0; i < n_monte_carlo_simulations; ++i) { results[i] = RunMonteCarloSimulation(system_inputs[i]); } }
The only caveat is that you have to be sure that the independent simulations are not writing over each other. For example, if by mistake you had written results[0]
instead of results[i]
, the final value on results[0]
would be that of whichever thread (parallel simulation) finished last, while the rest of the vector would be of zeros. It is important to notice here that the vector system_inputs
is read only once by the parallel code and all n_monte_carlo_simulations
threads would have access to it in memory.
Though this example is not rocket science, there is a lot more to OpenMP that can be found here and here.
MPI
MPI stands for Message Passing Interface. It creates processes which pass messages (information packages) to each other. The most basic MPI code is one that creates independent processes — each with its own allocated chunk of memory that cannot be accessed by any other process — and just has them running independently in parallel without communicating with each other. This is great to distribute Monte Carlo runs across various nodes, although it may lead to the limitations mentioned in the beginning of this post. Below is an example of a very basic MPI code to run Monte Carlo simulations in parallel:
#include #include "path/to/my_model.h" int main(int argc, char *argv[]) { MPI_Init(NULL, NULL); int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); vector system_inputs = ; RunMonteCarloSimulation(system_inputs[world_rank]); MPI_Finalize(); return 0; }
I will not cover in this post how to pass the output of the RunMonteCarloSimulation
function back to a results vector, as in the previous example, because details of MPI is outside the scope of this post — a good MPI tutorial can be found here and great examples, including ones for Monte Carlo runs, can be found here. To run this code, you would have to compile it with mpicc mpi_demo.c -o MPIDemo
and call it with the mpirun
or mpiexec
commands, which should be followed by the -n
flag to specify the number of processes to be created, or mpirun -n 4 ./MPIDemo
. The code above would run as many MonteCarlo simulations as there are processes (four processes if -n 4
was used), each with a rank (here a type of ID), and can be spread across as many nodes as you can get on a cluster. Each process of the code above would run on a core, which may result in the memory and/or processing limitations listed in the beginning of this post.
Mixing OpenMP and MPI
The ideal situation for most cases is when you use MPI to distribute processes across nodes and OpenMP to make sure that as many cores within a node as it makes sense to use are being used. To do this, you can use the structure of the code below.
#include #include #include #include "path/to/my_model.cpp" int main(int argc, char *argv[]) { MPI_Init(NULL, NULL); int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); int n_cores = omp_get_num_threads(); // get number of cores available for MPI process. vector system_inputs = ; vector results(n_cores , 0); #pragma omp parallel for for (int i = 0; i < n_cores ; ++i) { // printf("C hybrid. Process-thread: %s-%d\n", world_rank, omp_get_thread_num()); results[i] = RunMonteCarloSimulation(system_inputs[i]); } MPI_Finalize(); return 0; }
If you comment lines 4 and 20 and uncomment line 19, compile the code with mpicc hybrid.c -o Hybrid -fopenmp
, set the number of threads in the terminal to eight by running set OMP_NUM_THREADS=8
, and call the resulting executable with the command mpirun -n 4 ./Hybrid
, the executable will spawn four processes, each of which spawning eight threads and printing the following output:
C hybrid. Process-thread: 0-0 C hybrid. Process-thread: 0-1 C hybrid. Process-thread: 0-4 C hybrid. Process-thread: 0-5 C hybrid. Process-thread: 0-7 C hybrid. Process-thread: 0-6 C hybrid. Process-thread: 0-2 C hybrid. Process-thread: 0-3 C hybrid. Process-thread: 1-6 C hybrid. Process-thread: 1-0 C hybrid. Process-thread: 1-1 C hybrid. Process-thread: 1-3 C hybrid. Process-thread: 1-5 C hybrid. Process-thread: 1-4 C hybrid. Process-thread: 1-7 C hybrid. Process-thread: 1-2 C hybrid. Process-thread: 2-6 C hybrid. Process-thread: 2-0 C hybrid. Process-thread: 2-3 C hybrid. Process-thread: 2-7 C hybrid. Process-thread: 2-2 C hybrid. Process-thread: 2-4 C hybrid. Process-thread: 2-5 C hybrid. Process-thread: 2-1 C hybrid. Process-thread: 3-0 C hybrid. Process-thread: 3-1 C hybrid. Process-thread: 3-3 C hybrid. Process-thread: 3-4 C hybrid. Process-thread: 3-6 C hybrid. Process-thread: 3-7 C hybrid. Process-thread: 3-2 C hybrid. Process-thread: 3-5
The code above can be used to have one process per node (and therefore one copy of system_inputs per node) using as many threads as there are cores in that node — depending on the number of cores, it is sometimes more efficient to have two or three MPI processes per node, each using half or a third of the cores in that node. Below is a PBS job submission script that can be used to generate the output above — job submission scripts for Slurm to be used on Stampede 2 can be found here.
#!/bin/bash #PBS -N test_hybrid #PBS -l nodes=2:ppn=16 #PBS -l walltime=:00:30 #PBS -o ./output/hybrid_c.out #PBS -e ./error/hybrid_c.err module load openmpi-1.10.7-gnu-x86_64 export OMP_NUM_THREADS=8 set -x cd "$PBS_O_WORKDIR" # Construct a copy of the hostfile with only 16 entries per node. # MPI can use this to run 16 tasks on each node. export TASKS_PER_NODE=2 uniq "$PBS_NODEFILE"|awk -v TASKS_PER_NODE="$TASKS_PER_NODE" '{for(i=0;i nodefile #cat nodefile mpiexec --hostfile nodefile -n 4 -x OMP_NUM_THREADS ./Hybrid
Parallelizing Python on a Cluster
Python was not designed with shared-memory parallelization in mind, so its native parallelization library, the Multiprocessing library, does distributed-memory parallelization instead of shared. The issue with the Multiprocessing library is that it does not work across nodes, so you still need something like MPI. Here, we will use MPI4Py.
The code below parallelizes the function call across cores within the same node — please refer to this blog post for details about and other ways to use the Multiprocessing library.
from multiprocessing import Process, cpu_count def do_something_useful(rank, shared_process_number): # Do something useful here. print 'Python hybrid, MPI_Process-local_process (not quite a thread): {}-{}'.format(rank, shared_process_number) # Create node-local processes shared_processes = [] #for i in range(cpu_count()): for i in range(8): p = Process(target=do_something_useful, args=(i,)) shared_processes.append(p) # Start processes for sp in shared_processes: sp.start() # Wait for all processes to finish for sp in shared_processes: sp.join()
By adding MPI4Py to the code above, following the same logic of the C/C++ example, we get the following result:
from mpi4py import MPI from multiprocessing import Process, cpu_count def do_something_useful(rank, shared_process_number): # Do something useful here. print 'Python hybrid, MPI_Process-local_process (not quite a thread): {}-{}'.format(rank, shared_process_number) comm = MPI.COMM_WORLD rank = comm.Get_rank() print 'Calling Python multiprocessing process from Python MPI rank {}'.format(rank) # Create shared-ish processes shared_processes = [] #for i in range(cpu_count()): for i in range(8): p = Process(target=do_something_useful, args=(rank, i)) shared_processes.append(p) # Start processes for sp in shared_processes: sp.start() # Wait for all processes to finish for sp in shared_processes: sp.join() comm.Barrier()
Calling the code above with mpirun -n 4 python hybrid_pure_python.py
will first result in four MPI processes being spawned with ranks 1 through 4, each spawning eight local processes. The output should look similar to the one below:
Calling Python multiprocessing process from Python MPI rank 0 Calling Python multiprocessing process from Python MPI rank 1 Python hybrid, MPI_Process-local_process (not quite a thread): 1-0 Python hybrid, MPI_Process-local_process (not quite a thread): 0-0 Python hybrid, MPI_Process-local_process (not quite a thread): 1-1 Python hybrid, MPI_Process-local_process (not quite a thread): 0-1 Python hybrid, MPI_Process-local_process (not quite a thread): 1-2 Python hybrid, MPI_Process-local_process (not quite a thread): 0-2 Python hybrid, MPI_Process-local_process (not quite a thread): 0-3 Python hybrid, MPI_Process-local_process (not quite a thread): 1-3 Calling Python multiprocessing process from Python MPI rank 2 Calling Python multiprocessing process from Python MPI rank 3 Python hybrid, MPI_Process-local_process (not quite a thread): 1-4 Python hybrid, MPI_Process-local_process (not quite a thread): 0-4 Python hybrid, MPI_Process-local_process (not quite a thread): 0-5 Python hybrid, MPI_Process-local_process (not quite a thread): 1-5 Python hybrid, MPI_Process-local_process (not quite a thread): 1-6 Python hybrid, MPI_Process-local_process (not quite a thread): 0-6 Python hybrid, MPI_Process-local_process (not quite a thread): 1-7 Python hybrid, MPI_Process-local_process (not quite a thread): 0-7 Python hybrid, MPI_Process-local_process (not quite a thread): 2-0 Python hybrid, MPI_Process-local_process (not quite a thread): 3-0 Python hybrid, MPI_Process-local_process (not quite a thread): 3-1 Python hybrid, MPI_Process-local_process (not quite a thread): 2-1 Python hybrid, MPI_Process-local_process (not quite a thread): 3-2 Python hybrid, MPI_Process-local_process (not quite a thread): 2-2 Python hybrid, MPI_Process-local_process (not quite a thread): 3-3 Python hybrid, MPI_Process-local_process (not quite a thread): 2-3 Python hybrid, MPI_Process-local_process (not quite a thread): 3-4 Python hybrid, MPI_Process-local_process (not quite a thread): 2-4 Python hybrid, MPI_Process-local_process (not quite a thread): 2-5 Python hybrid, MPI_Process-local_process (not quite a thread): 3-5 Python hybrid, MPI_Process-local_process (not quite a thread): 3-6 Python hybrid, MPI_Process-local_process (not quite a thread): 3-7 Python hybrid, MPI_Process-local_process (not quite a thread): 2-6 Python hybrid, MPI_Process-local_process (not quite a thread): 2-7
Below is a PBS submission script that can be used with the code above:
#!/bin/bash #PBS -N test_hybrid #PBS -l nodes=2:ppn=16 #PBS -l walltime=00:01:00 #PBS -o ./output/hybrid_python.out #PBS -e ./error/hybrid_python.err module load python-2.7.5 export OMP_NUM_THREADS=8 set -x cd "$PBS_O_WORKDIR" # Construct a copy of the hostfile with only 16 entries per node. # MPI can use this to run 16 tasks on each node. export TASKS_PER_NODE=2 uniq "$PBS_NODEFILE"|awk -v TASKS_PER_NODE="$TASKS_PER_NODE" '{for(i=0;i nodefile #cat nodefile mpiexec --hostfile nodefile -n 4 -x OMP_NUM_THREADS python hybrid_pure_python.py
Parallelizing Monte Carlo Runs of C/C++ (or anything else) with Python
Very often we have to run the same model dozens to thousands of times for Monte-Carlo-style analyses. One option is to go on a cluster and submit dozens to thousand of jobs, but this becomes a mess for the scheduler to manage and cluster managers tend to not be very fond of such approach. A cleaner way of performing Monte Carlo runs in parallel is to write a code in Python or other language that spawns MPI processes which then call the model to be run. This can be done in Python with the subprocess library, as in the example below:
from mpi4py import MPI import subprocess comm = MPI.COMM_WORLD rank = comm.Get_rank() print 'Calling OpenmpTest from Python MPI rank {}'.format(rank) function_call = ['./OpenmpTest', '{}'.format(rank)] subprocess.call(function_call) comm.Barrier()
Here, OpenmpTest is an executable compiled from the C-OpenMP code below:
#include #include int main(int argc, char *argv[]) { #pragma omp parallel for for (int i = 0; i < omp_get_num_threads(); ++i) { printf("C OpenMP. Process-thread: %s-%d\n", argv[1], i); } return 0; }
Submitting the code above with following PBS job submission script should result in the output below.
#!/bin/bash
#PBS -N test_hybrid
#PBS -l nodes=2:ppn=16
#PBS -l walltime=00:01:00
#PBS -o ./output/hybrid_python_c.out
#PBS -e ./error/hybrid_python_c.err
module load python-2.7.5
export OMP_NUM_THREADS=8
set -x
cd "$PBS_O_WORKDIR"
# Construct a copy of the hostfile with only 16 entries per node.
# MPI can use this to run 16 tasks on each node.
export TASKS_PER_NODE=2
uniq "$PBS_NODEFILE"|awk -v TASKS_PER_NODE="$TASKS_PER_NODE" '{for(i=0;i nodefile
#cat nodefile
mpiexec –hostfile nodefile -n 4 -x OMP_NUM_THREADS python hybrid_calls_c.py
Output:
Calling OpenmpTest from Python MPI rank 0 Calling OpenmpTest from Python MPI rank 1 Calling OpenmpTest from Python MPI rank 2 Calling OpenmpTest from Python MPI rank 3 C OpenMP. Process-thread: 0-7 C OpenMP. Process-thread: 0-5 C OpenMP. Process-thread: 0-0 C OpenMP. Process-thread: 0-2 C OpenMP. Process-thread: 0-4 C OpenMP. Process-thread: 0-1 C OpenMP. Process-thread: 0-3 C OpenMP. Process-thread: 0-6 C OpenMP. Process-thread: 1-1 C OpenMP. Process-thread: 1-2 C OpenMP. Process-thread: 1-6 C OpenMP. Process-thread: 1-4 C OpenMP. Process-thread: 1-5 C OpenMP. Process-thread: 1-0 C OpenMP. Process-thread: 1-3 C OpenMP. Process-thread: 1-7 C OpenMP. Process-thread: 2-3 C OpenMP. Process-thread: 2-7 C OpenMP. Process-thread: 2-0 C OpenMP. Process-thread: 2-4 C OpenMP. Process-thread: 2-6 C OpenMP. Process-thread: 2-5 C OpenMP. Process-thread: 2-1 C OpenMP. Process-thread: 2-2 C OpenMP. Process-thread: 3-5 C OpenMP. Process-thread: 3-1 C OpenMP. Process-thread: 3-4 C OpenMP. Process-thread: 3-2 C OpenMP. Process-thread: 3-7 C OpenMP. Process-thread: 3-3 C OpenMP. Process-thread: 3-0 C OpenMP. Process-thread: 3-6
End of the Story
As highlighted in the examples above (all of which can be download from here), creating parallel code is not rocket science and requires very few lines of code. The OpenMP and Multiprocessing parallelization tricks can save hours if you have to run a model several times on your personal computer, and paired with MPI these tricks can save you from hours to days in time and thousands of node-hours on clusters. I hope you make good use of it.