Developing parallelised code with MPI for dummies, in C (Part 2/2)

My last post introduced MPI and demonstrated a simple example for using it to parallelize a code across multiple nodes. In the previous example, we created an executable that could be run in parallel to complete the same task multiple times. But what if we want use MPI to on a code that has both parallel and serial sections, this is inevitable if we want everything to be self-contained.

As I tried to stress last time, MPI runs multiple versions of the same executable each with independent memory (please read this sentence three times, it is very different from how you learned to code). If you wish to share memory, you must explicitly send it. This allows no scope for a serial section!

We must, instead, imitate serial sections of code by designating a ‘root’ processor, conventionally the processor with rank = 0. We trap the ‘serial section’ inside an if-statement designating the root and send data to it from other processors when required.

Sending Data

I will build on the previous two examples by creating a loop that calculates the mean of a set of random numbers, we will parallelize the random number generation but leave the mean calculation in ‘serial’ (i.e. to be calculated by the root processor).

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int main(){
int size, rank,i;
MPI_Init(NULL,NULL);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
double randSum = 0;
srand(rank + 1);
double myRand = (double)rand()/(double)RAND_MAX;
printf("I evaluated rank = %d, myRand = %f\n",rank,myRand);
if (rank == 0){
   for (i=0;i<size;i++){
      if (i > 0){
         MPI_Recv(&myRand, 1, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      }
      randSum = randSum + myRand;
   }
   printf("Mean random number = %f\n",randSum/size);
}
else{
   MPI_Send(&myRand, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}

For contrast with a regular serial version:

#include <stdio.h>
#include <stdlib.h>
int main(){
int rank,size = 10;
MPI_Init(NULL,NULL);
double randSum = 0;
srand(rank + 1);
double myRand = (double)rand()/(double)RAND_MAX;
printf("I evaluated rank = %d, myRand = %f\n",rank,myRand);
if (rank == 0){
   for (rank = 0; rank < size; ++rank){
      srand(rank + 1);
      randSum = randSum + (double)rand()/(double)RAND_MAX;
      printf("I evaluated rank = %d, myRand = %f\n",rank,myRand);}
      printf("Mean random number = %f\n",randSum/size);
   }
}
return 0;
}

We introduce here two new MPI functions:

MPI_Send(data address, size of data, MPI type of data, processor destination (by rank), tag, communicator) sends the random number to the root (rank 0).

MPI_Recv(data address, size of data, MPI type of data, processor source (by rank), tag, communicator, status suppression) tells a processor, in our case the root, to receive data from a processor source.

Both MPI_Send and MPI_Recv prevent code from progressing further until the send-> receive is resolved. i.e. when rank = 5 reaches send, it will wait until rank = 0 has received data from ranks 1:4 before sending the data and progressing further.

Broadcasting data

Sending data between processors in MPI is moderately expensive, so we want to call send/recv as few times as possible. This means that vectors should be sent in one, rather than in a loop. It also means that when sending data from one processor to all (most commonly from the root), it is more efficient to use the built in ‘broadcast’ rather than sending to each processor individually (the reason for this is explained in: http://mpitutorial.com/tutorials/mpi-broadcast-and-collective-communication/).

Below we will introduce an example where the root broadcasts how many random numbers each processor should create, these vectors of random numbers are then sent back to the root for mean calculation.

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int main(){
int size, rank,i,j;
MPI_Init(NULL,NULL);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
double randSum = 0;
int numRands;
srand(rank+1);
if (rank == 0){
   numRands = 5;
   MPI_Bcast(&numRands,1,MPI_INT,0,MPI_COMM_WORLD);
}
else{
   MPI_Bcast(&numRands,1,MPI_INT,0,MPI_COMM_WORLD);
} 
double *myRand = calloc(numRands,sizeof(double));
for (i =0;i<numRands;++i){
   myRand[i] = (double)rand()/(double)RAND_MAX;
}
if (rank == 0){
   for (i=0;i<size;i++){
      printf("root received from rank %d the vector: ",i);
      if (i > 0){
         MPI_Recv(myRand, numRands, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      }
      for (j=0;j<numRands;j++){
         printf("%f ",myRand[j]);
         randSum = randSum + myRand[j];
      }
      printf("\n");
   }
   printf("Mean random number = %f\n",randSum/(size*numRands));
}
else{
   MPI_Send(myRand, numRands, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}
free(myRand);
MPI_Finalize();
return 0;
}

We have new used the new MPI function:

MPI_Bcast(data address, size of data, MPI data type, processor source, communicator) broadcasts from the processor source (in our case the root) to all other processors, readers should note the common mistake of using MPI_Recv instead of MPI_Bcast to receive the data; MPI_Bcast is the function to both send and receive data.

Another simple but common mistake that readers should note is the passing of dynamically sized data; note how myRand is sent without the & address operator (because the variable itself is an address) while numRands is sent with the & operator.

Concluding remarks

This tutorial should set you up to use much of the MPI functionality you need to parallelise your code. Some natural questions that may have arisen while reading this tutorial that we did not cover:

MPI_Barrier – while MPI_Send/Recv/Bcast require processors to ‘catch up’, if you are writing and reading data to files (particularly if a processor must read data written by another processor) then you need to force the processors to catch up; MPI_Barrier achieves this.

tags – you can contain metadata that can be described by integers (e.g. vector length or MPI data type) in the ‘tag’ option for MPI_Send/Recv.

MPI_Status – this structure can contain details about the data received (rank, tag and length of the message), although much of the time this will be known in advance. Since receiving the status can be expensive, MPI_STATUS_IGNORE is used to supress the status structure.

All of the MPI functions described in this tutorial are only a subset of those available that I have found useful in parallelizing my current applications. An exhaustive list can be found at: http://www.mpich.org/static/docs/latest/. If you want to go beyond the functions described in this post (or you require further detail) I would recommend: http://mpitutorial.com/tutorials/.

Part (1/2): https://waterprogramming.wordpress.com/2017/07/21/developing-parallelised-code-with-mpi-for-dummies-in-c-part-12/

Developing parallelised code with MPI for dummies, in C (Part 1/2)

Parallel computing allows for faster implementation of a code by enabling the simultaneous execution of multiple tasks. Before we dive in to how parallelisation of a code is achieved, let’s briefly review the components that make up a high performance computing (HPC) cluster (it should be noted that you can parallelise code on your own computer, but this post will focus on parallelisation on clusters).  High performance computing clusters are usually comprised of a network of individual computers known as nodes that function together as a single computing resource as shown in Figure 1. Each node has some number of processors (the chip within a node that actually executes instructions) and modern processors may contain multiple cores, each of which can execute operations independently. Processors performing tasks on the same node have access to shared memory, meaning they can write and reference the same memory locations as they execute tasks. Memory is not shared between nodes however, so operations that run on multiple nodes use what’s known as distributed-memory programming. In order to properly manage tasks using distributed memory, nodes must have a way to pass information to each other.

garbage

Figure 1: One possible configuration of a HPC cluster, based on the Cornell CAC presentation linked in the following paragraph.

Parallelization is commonly performed using OpenMP or MPI.  OpenMP (which stands for Open Multi-Processing) parallelises operations by multithreading, running tasks on multiple cores/units within a single node using shared memory.  MPI (which stands for Message Passing Interface) parallelises tasks by distributing them over multiple nodes (also possible over multiple processors) within a network, utilizing the distributed memory of each node. This has two practical implications; MPI is needed to scale to more nodes but communication between tasks is harder. The two parallelisation methods are not mutually exclusive; you could use OpenMP to parallelise operations on individual network nodes and MPI to communicate between nodes on the network (example: http://www.slac.stanford.edu/comp/unix/farm/mpi_and_openmp.html). Both OpenMP and MPI support your favourite languages (C, C++, FORTRAN, Python, but not Java – perfect!). The remainder of this post will focus on implementing MPI in your code, for references on using OpenMP, see this presentation by the Cornell Center for Advanced Computing: https://www.cac.cornell.edu/education/training/ParallelMay2012/ProgOpenMP.pdf.

So how does MPI work?

MPI creates multiple instances of your executable and runs one on each processor you have specified for use. These processors can communicate with each other using specific MPI functions. I will explain a few of the more basic functions in this post.

What does MPI code look like?

A parallel loop (compiles with: mpicc -O3 -o exampleParallel.exe exampleParallel.c -lm)

#include 
#include 
#include 
int main(){ 
int size, rank; 
MPI_Init(NULL,NULL);    
MPI_Comm_size(MPI_COMM_WORLD, &size); 
MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
printf("I evaluated rank = %d\n",rank); 
MPI_Finalize(); 
return 0;
}

The parallel loop can be distributed over 10 processors (ppn) in one node and submitted to the Cube as a Unix script:

#!/bin/bash
#PBS -N exampleParallel
#PBS -l nodes=1:ppn=10
#PBS -l walltime=0:00:05
#PBS -j oe
#PBS -o output
cd $PBS_O_WORKDIR
mpirun ./exampleParallel.exe

This can be contrasted with a serial loop (compiles with: gcc -O3 -o exampleSerial.exe exampleSerial.c):

#include 
#include 
int main(){ 
int rank, size = 10; 
for (rank = 0; rank < size; rank++){ 
   printf("I evaluated rank = %d\n",rank);
} 
return 0;}

Let’s have a look at what each line of the parallel code is doing;

MPI_Init is the first MPI function that must be called in a piece of code in order to initialize the MPI environment. NOTE! MPI_Init does not signify the start of a ‘parallel section’, as I said earlier, MPI does not have parallel sections, it runs multiple instances of the same executable in parallel.

MPI_Comm_size populates an integer address with the number of processors in a group, in our example, 10 (i.e. num nodes * processors per node).

MPI_Comm_rank populates an integer address with the processor number for the current instance of the executable. This ‘rank’ variable is the main way to differentiate between different instances of your executable, it is equivalent to a loop counter.

MPI_Finalize is the last MPI function that must be called in a piece of code, it terminates the MPI environment. As far as I can tell it is only recommended to have return functions after MPI_Finalize.

This simple example highlights the difference between MPI and serial code; that each executable is evaluated separately in parallel. While this makes MPI hard to code, and sharing data between parallel processes expensive, it also makes it much easier to distribute across processors.

Next week we will present examples demonstrating how to send data between nodes and introduce serial sections of code.

Part (2/2): https://waterprogramming.wordpress.com/2017/07/28/developing-parallelised-code-with-mpi-for-dummies-in-c-part-22/

References (each of these are a useful link if you would like to learn more about parallel computing and HPC):

Parallel Programming Concepts and High-Performance Computing, a module in the Cornell Virtual Workshop

CAC Glossary of HPC Terms: https://cvw.cac.cornell.edu/main/glossary

Algorithm Diagnostics Walkthrough using the Lake Problem as an example (Part 3 of 3: Metrics-based analysis of algorithm performance)

Now that you have your desired metrics based on part 2 of this series, it is possible to gain more insight into your algorithm performance. When I performed this analysis for the actual study, I used the AWRAnalysis.java, Analysis_Attainment_LakeProblem.sh and HypervolumeEval.java files found in the Github repository as explained in the README. I later discovered it was possible to do this within the framework, so that option will be discussed here.

It is possible to calculate the hypervolume of a Pareto Approximate Front within the framework using the SetHypervolume class. For more information on the MOEAFramework classes, please see the following link (http://moeaframework.org/javadoc/index.html).

I used the following command: (Note the change to version 2.3 because I reran this command today to check I remembered it correctly although it seems there is now a version 2.4. It is always best to use the newest version.)


java –cp MOEAFramework-2.3-Demo.jar org.moeaframework.analysis.sensitivity.SetHypervolume myLake4ObjStoch.reference –e 0.01,0.01,0.0001,0.0001 myLake4ObjStoch.reference

This returns a hypervolume value between 0 and 1 that is useful for threshold calculations as shown below.

To calculate threshold attainments, use the Analysis class. Below is an example of performing attainment analysis within the framework instead of using AWRAnalysis.java.  This approach generates a huge number of files, which are best understood when plotted, a subject for a future post.


#!/bin/bash
#source setup_LTM.sh

dim=4
problem=myLake4ObjStoch
epsilon="0.01,0.01,0.0001,0.0001"

algorithms="Borg eMOEA eNSGAII GDE3 MOEAD NSGAII"
seeds="1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50"
percentiles="`seq 1 1 100`"
thresholds=(`seq 0.01 0.01 1.0`)

#compute averages across metrics
#echo "Computing averages across metrics..."
#for algorithm in ${algorithms}
#do
# echo "Working on: " ${algorithm}
# java -classpath `cygpath -wp $CLASSPATH` org.moeaframework.analysis.sensitivity.MetricFileStatistics --mode average --output $WORK/metrics/${algorithm}_${problem}.average $WORK/metrics/${algorithm}_${problem}_*.metrics
#done
#echo "Done!"

#compute search control metrics (for best and attainment)
echo "Computing hypervolume search control metrics..."
for algorithm in ${algorithms}
do
 echo "Working on: " ${algorithm}
 counter=$1
 for percentile in ${percentiles}
 do
 java -classpath MOEAFramework-2.3-Demo.jar org.moeaframework.analysis.sensitivity.Analysis --parameterFile ./${algorithm}_params.txt --parameters ./${algorithm}_Latin --metric 0 --threshold ${thresholds[$counter]} --hypervolume 0.7986 ./SOW6/metrics/average_replace_NaNs/${algorithm}_${problem}.average > ./test/Hypervolume_${percentile}_${algorithm}.txt
 counter=$((counter+1))
 done
 done
echo "Done!"

echo "Computing generational distance search control metrics..."
for algorithm in ${algorithms}
do
 echo "Working on: " ${algorithm}
 counter=$1
 for percentile in ${percentiles}
 do
 java -classpath MOEAFramework-2.3-Demo.jar org.moeaframework.analysis.sensitivity.Analysis --parameterFile ./${algorithm}_params.txt --parameters ./${algorithm}_Latin --metric 1 --threshold ${thresholds[$counter]} ./SOW6/metrics/average_replace_NaNs/${algorithm}_${problem}.average > ./test/GenDist_${percentile}_${algorithm}.txt
 counter=$((counter+1))
 done
done
echo "Done!"

echo "Computing additive epsilon indicator search control metrics..."
for algorithm in ${algorithms}
do
 echo "Working on: " ${algorithm}
 counter=$1
 for percentile in ${percentiles}
 do
 java -classpath MOEAFramework-2.3-Demo.jar org.moeaframework.analysis.sensitivity.Analysis --parameterFile ./${algorithm}_params.txt --parameters ./${algorithm}_Latin --metric 4 --threshold ${thresholds[$counter]} ./SOW6/metrics/average_replace_NaNs/${algorithm}_${problem}.average > ./test/EpsInd_${percentile}_${algorithm}.txt
 counter=$((counter+1))
 done
done
echo "Done!"

I did encounter some caveats while working through this process. Scripts for handling them and instructions are provided in the Diagnostic-Source README on Github. One caveat that is not covered there is increasing the speed of the hypervolume calculation. Please see Dave Hadka’s Hypervolume repository for more information (https://github.com/dhadka/Hypervolume).

Algorithm Diagnostics Walkthrough using the Lake Problem as an example (Part 2 of 3: Calculate metrics for Analysis) Tori Ward

This post continues from Part 1, which provided examples of using the MOEAFramework to generate Pareto approximate fronts for a comparative diagnostic study.

Once one has finished generating all of the approximate fronts and respective reference sets one hopes to analyze, metrics may be calculated within the MOEAFramework. I calculated metrics for both my local reference sets and all of my individual approximations of the Pareto front. The metrics for the individual approximations were averaged for each parameterization across all seeds to determine the expected performance for a single seed.

Calculate Metrics

Local Reference Set Metrics

#!/bin/bash

NSAMPLES=50
NSEEDS=50
METHOD=Latin
PROBLEM=myLake4ObjStoch
ALGORITHMS=( NSGAII GDE3 eNSGAII MOEAD eMOEA Borg)

SEEDS=$(seq 1 ${NSEEDS})
JAVA_ARGS="-cp MOEAFramework-2.1-Demo.jar"
set -e

for ALGORITHM in ${ALGORITHMS[@]}
do
NAME=${ALGORITHM}_${PROBLEM}
PBS="\
#PBS -N ${NAME}\n\
#PBS -l nodes=1\n\
#PBS -l walltime=96:00:00\n\
#PBS -o output/${NAME}\n\
#PBS -e error/${NAME}\n\
cd \$PBS_O_WORKDIR\n\
java ${JAVA_ARGS} \
org.moeaframework.analysis.sensitivity.ResultFileEvaluator \
--b ${PROBLEM} --i ./SOW4/${ALGORITHM}_${PROBLEM}.reference \
--r ./SOW4/reference/${PROBLEM}.reference --o ./SOW4/${ALGORITHM}_${PROBLEM}.localref.metrics"
echo -e $PBS | qsub
done

Individual Set Metrics

#!/bin/bash

NSAMPLES=50
NSEEDS=50
METHOD=Latin
PROBLEM=myLake4ObjStoch
ALGORITHMS=( NSGAII GDE3 eNSGAII MOEAD eMOEA Borg)

SEEDS=$(seq 1 ${NSEEDS})
JAVA_ARGS="-cp MOEAFramework-2.1-Demo.jar"
set -e

for ALGORITHM in ${ALGORITHMS[@]}
do
for SEED in ${SEEDS}
do
NAME=${ALGORITHM}_${PROBLEM}_${SEED}
PBS="\
#PBS -N ${NAME}\n\
#PBS -l nodes=1\n\
#PBS -l walltime=96:00:00\n\
#PBS -o output/${NAME}\n\
#PBS -e error/${NAME}\n\
cd \$PBS_O_WORKDIR\n\
java ${JAVA_ARGS} \
org.moeaframework.analysis.sensitivity.ResultFileEvaluator \
--b ${PROBLEM} --i ./SOW4/sets/${ALGORITHM}_${PROBLEM}_${SEED}.set \
--r ./SOW4/reference/${PROBLEM}.reference --o ./SOW4/metrics/${ALGORITHM}_${PROBLEM}_${SEED}.metrics"
echo -e $PBS | qsub
done
done

Average Individual Set Metrics across seeds for each parameterization

#!/bin/bash
#PBS -l nodes=1:ppn=1
#PBS -N moeaevaluations
#PBS -j oe
#PBS -l walltime=96:00:00

cd "$PBS_O_WORKDIR"

NSAMPLES=50
NSEEDS=50
METHOD=Latin
PROBLEM=myLake4ObjStoch
ALGORITHMS=( NSGAII GDE3 eNSGAII MOEAD eMOEA Borg)

SEEDS=$(seq 1 ${NSEEDS})
JAVA_ARGS="-cp MOEAFramework-2.1-Demo.jar"
set -e

# Average the performance metrics across all seeds
for ALGORITHM in ${ALGORITHMS[@]}
do
echo -n "Averaging performance metrics for ${ALGORITHM}..."
java ${JAVA_ARGS} \
org.moeaframework.analysis.sensitivity.SimpleStatistics \
-m average --ignore -o ./metrics/${ALGORITHM}_${PROBLEM}.average ./metrics/${ALGORITHM}_${PROBLEM}_*.metrics
echo "done."
done

At the end of this script, I also calculated the set contribution I mentioned earlier by including the following lines.

# Calculate set contribution
echo ""
echo "Set contribution:"
java ${JAVA_ARGS} org.moeaframework.analysis.sensitivity.SetContribution \
-e 0.01,0.01,0.001,0.01 -r ./reference/${PROBLEM}.reference ./reference/*_${PROBLEM}.combined

Part 3 covers using the MOEAFramework for further analysis of these metrics.