Parallel Programming Part 2: Demonstrating OpenMP constructs with a simple test case

Parallel Programming Part 2: Demonstrating OpenMP constructs with a simple test case

In Parallel Programming Part 1, I introduced a number of basic OpenMP directives and constructs, as well as provided a quick guide to high-performance computing (HPC) terminology. In Part 2 (aka this post), we will apply some of the constructs previously learned to execute a few OpenMP examples. We will also view an example of incorrect use of OpenMP constructs and its implications.

TheCube Cluster Setup

Before we begin, there are a couple of steps to ensure that we will be able to run C++ code on the Cube. First, check the default version of the C++ compiler. This compiler essentially transforms your code into an executable file that can be easily understood and implemented by the Cube. You can do this by entering

g++ --version

into your command line. You should see that the Cube has G++ version 8.3.0 installed, as shown below.

Now, create two files:

  • openmp-demo-print.cpp this is the file where we will perform a print demo to show how tasks are distributed between threads
  • openmp-demo-access.cpp this file will demonstrate the importance of synchronization and the implications if proper synchronization is not implemented

Example 1: Hello from Threads

Open the openmp-demo-print.cpp file, and copy and paste the following code into it:

#include <iostream>
#include <omp.h>
#include <stdio.h>

int main() {

    int num_threads = omp_get_max_threads(); // get max number of threads available on the Cube
    omp_set_num_threads(num_threads);

    printf("The CUBE has %d threads.\n", num_threads);


    #pragma omp parallel 
    {
        int thread_id = omp_get_thread_num();
        printf("Hello from thread num %d\n", thread_id);
    }

    printf("Thread numbers written to file successfully. \n");

    return 0;
}

Here is what the code is doing:

  • Lines 1 to 3 are the libraries that need to be included in this script to enable the use of OpenMP constructs and printing to the command line.
  • In main() on Lines 7 and 8, we first get the maximum number of threads that the Cube has available on one node and assign it to the num_threads variable. We then set the number of threads we want to use.
  • Next, we declare a parallel section using #pragma omp parallel beginning on Line 13. In this section, we are assigning the printf() function to each of the num_threads threads that we have set on Line 8. Here, each thread is now executing the printf() function independently of each other. Note that the threads are under no obligation to you to execute their tasks in ascending order.
  • Finally, we end the call to main() by returning 0

Be sure to first save this file. Once you have done this, enter the following into your command line:

g++ openmp-demo-print.cpp -o openmp-demo-print -fopenmp

This tells the system to compile openmp-demo-print.cpp and create an executable object using the same file name. Next, in the command line, enter:

./openmp-demo-print

You should see the following (or some version of it) appear in your command line:

Notice that the threads are all being printed out of order. This demonstrates that each thread takes a different amount of time to complete its task. If you re-run the following script, you will find that the order of threads that are printed out would have changed, showing that there is a degree of uncertainty associated with the amount of time they each require. This has implications for the order in which threads read, write, and access data, which will be demonstrated in the next example.

Example 2: Who’s (seg)Fault Is It?

In this example, we will first execute a script that stores each thread number ID into an array. Each thread then accesses its ID and prints it out into a text file called access-demo.txt. See the implementation below:

#include <iostream>
#include <omp.h>
#include <vector>

using namespace std;

void parallel_access_demo() {
    vector<int> data;
    int num_threads = omp_get_max_threads();

    // Parallel region for appending elements to the vector
    #pragma omp parallel for
    for (int i = 0; i < num_threads; ++i) {
        // Append the thread ID to the vector
        data.push_back(omp_get_thread_num()); // Potential race condition here
    }
}

int main() {
    // Execute parallel access demo
    parallel_access_demo();
    printf("Code ran successfully. \n");

    return 0;
}

As per the usual, here’s what’s going down:

  • After including the necessary libraries, we use include namespace std on Line 5 so that we can more easily print text without having to used std:: every time we attempt to write a statement to the output file
  • Starting on Line 7, we write a simple parallel_access_demo() function that does the following:
    • Initialize a vector of integers very creatively called data
    • Get the maximum number of threads available per node on the Cube
    • Declare a parallel for region where we iterate through all threads and push_back the thread ID number into the data vector
  • Finally, we execute this function in main() beginning Line 19.

If you execute this using a similar process shown in Example 1, you should see the following output in your command line:

Congratulations – you just encountered your first segmentation fault! Not to worry – this is by design. Take a few minutes to review the code above to identify why this might be the case.

Notice Line 15. Where, each thread is attempting to insert a new integer into the data vector. Since push_back() is being called in a parallel region, this means that multiple threads are attempting to modify data simultaneously. This will result in a race condition, where the size of data is changing unpredictably. There is a pretty simple fix to this, where we add a #pragma omp critical before Line 15.

As mentioned in Part 1, #pragma omp critical ensures that each thread executes its task one at a time, and not all at once. It essentially serializes a portion of the parallel for loop and prevents race conditions by ensuring that the size of data changes in a predictable and controlled manner. An alternative would be to declare #pragma omp single in place of #pragma omp parallel for on Line 12. This essentially delegates the following for-loop to a single thread, forcing the loop to be completed in serial. Both of these options will result in the blissful outcome of

Summary

In this post, I introduced two examples to demonstrate the use of OpenMP constructs. In Example 1, I demonstrated a parallel printing task to show that threads do not all complete their tasks at the same time. Example 2 showed how the lack of synchronization can lead to segmentation faults due to race conditions, and two methods to remedy this. In Part 3, I will introduce GPU programming using the Python PyTorch library and Google Colaboratory cloud infrastructure. Until then, happy coding!

Parallel Programming Part 1: A brief introduction to OpenMP directives and constructs

In my recent Applied High Performance Computing (HPC) lectures, I have been learning to better utilize OpenMP constructs and functions . If these terms mean little to nothing to you, welcome to the boat – it’ll be a collaborative learning experience. For full disclosure, this blog post assumes some basic knowledge of parallel programming and its related terminology. But just in case, here is a quick glossary:

  • Node: One computing unit within a supercomputing cluster. Can be thought of as on CPU unit on a traditional desktop/laptop.
  • Core: The physical processing unit within the CPU that enables computations to be executed. For example, an 11th-Gen Intel i5 CPU has four cores.
  • Processors/threads: Simple, lightweight “tasks” that can be executed within a core. Using the same example as above, this Intel i5 CPU has eight threads, indicating that each of its cores can run two threads simultaneously.
  • Shared memory architecture: This means that all cores share the same memory within a CPU, and are controlled by the same operating system. This architecture is commonly found in your run-of-the-mill laptops and desktops.
  • Distributed memory architecture: Multiple cores run their processes independently of each other. They communicate and exchange information via a network. This architecture is more commonly found in supercomputer clusters.
  • Serial execution: Tasks and computations are completed one at a time. Typically the safest and most error-resistant form of executing tasks, but is the least efficient.
  • Parallel execution: Tasks and computations are completed simultaneously. Faster and more efficient than serial execution, but runs the risk of errors where tasks are completed out of order.
  • Synchronization: The coordination of multiple tasks or computations to ensure that all processes aligns with each other and abide by a certain order of execution. Synchronization takes up more time than computation. AN overly-cautious code execution with too many synchronization points can result in unnecessary slowdown. However, avoiding it altogether can result in erroneous output due to out-of-order computations.
  • Race condition: A race condition occurs when two threads attempt to access the same location on the shared memory at the same time. This is a common problem when attempting parallelized tasks on shared memory architectures.
  • Load balance: The distribution of work (tasks, computations, etc) across multiple threads. OpenMP implicitly assumes equal load balance across all its threads within a parallel region – an assumption that may not always hold true.

This is intended to be a two-part post where I will first elaborate on the OpenMP API and how to use basic OpenMP clauses to facilitate parallelization and code speedup. In the second post, I will provide a simple example where I will use OpenMP to delineate my code into serial and parallel regions, force serialization, and synchronize memory accesses and write to prevent errors resulting from writing to the same memory location simultaneously.

What is OpenMP?

OpenMP stands for “Open Specification for Multi-Processing”. It is an application programming interface (API) that enables C/C++ or Fortran code to “talk” (or interface) with multiple threads within and across multiple computing nodes on shared memory architectures, therefore allowing parallelism. It should not be confused with MPI (Message Passing Interface), which is used to develop parallel programs on distributed memory systems (for a detailed comparison, please refer to this set of Princeton University slides here). This means that unlike a standalone coding language like C/C++. Python, R, etc., it depends only on a fixed set of simple implementations using lightweight syntax (#pragma omp [clause 1] [clause 2]) . To begin using OpenMP, you should first understand what it can and cannot do.

Things you can do with OpenMP

Explicit delineation of serial and parallel regions

One of the first tasks that OpenMP excels at is the explicit delineation of your code into serial and parallel regions. For example:

#include <stdio.h>
#include <omp.h>
void some_function() {
	#pragma omp parallel {
		do_a_parallel_task();
	}
	do_a_serial_task();
}

In the code block above, we are delineating an entire parallel region where the task performed by the do_a_parallel_task() function is performed independently by each thread on a single node. The number of tasks that can be completed at any one time depends on the number of threads that are available on a single node.

Hide stack management

In addition, using OpenMP allows you to automate memory allocation, or “hide stack management”. Most machines are able to read, write, and (de)allocate data to optimize performance, but often some help from the user is required to avoid errors caused by multiple memory access or writes . OpenMP allows the user to automate stack management without explicitly specifying memory reads or writes in certain cache locations via straightforward clauses such as collapse() (we will delve into the details of such clauses in later sections of this blog post).

Allow synchronization

OpenMP also provides synchronization constructs to enforce serial operations or wait times within parallel regions to avoid conflicting or simultaneous read or write operations where doing so might result in erroneous memory access or segmentation faults. For example, a sequential task (such as a memory write to update the value in a specific location) that is not carefully performed in parallel could result in output errors or unexpected memory accessing behavior. Synchronization ensures that multiple threads have to wait until the last operation on the last thread is completed, or only one thread is allowed to perform an operation at a time. It also protects access to shared data to prevent multiple reads (false sharing). All this can be achieved via the critical, barrier, or single OpenMP clauses.

Creating a specific number of threads

Unless explicitly stated, the system will automatically select the number of threads (or processors) to use for a specific computing task. Note that while you can request for a certain number of threads in your environment using export OMP_NUM_THREADS, the number of threads assigned to a specific task is set by the system unless otherwise specified. OpenMP allows you to control the number of threads you want to use in a specific task in one of two ways: (a) using omp_set_num_threads() and using (b) #pragma omp parallel [insert num threads here]. The former allows you to enforce an upper limit of  the number of threads to be freely assigned to tasks throughout the code, while the latter allows you to specify a set number of threads to perform an operation(s) within a parallel region.

In the next section, we will list some commonly-used and helpful OpenMP constructs and provide examples of their use cases.

OpenMP constructs and their use cases

OMP_NUM_THREADS
This is an environment variable that can be set either in the section of your code where you defined your global variables, or in the SLURM submission script you are using to execute your parallel code.

omp_get_num_threads()
This function returns the number of threads currently executing the parallel region from which it is called. Use this to gauge how many active threads are being used within a parallel region

omp_set_num_threads()
This function specifies the number of threads to use for any following parallel regions. It overrides the OMP_NUM_THREADS environment variable. Be careful to not exceed the maximum number of threads available across all your available cores. You will encounter an error otherwise.

omp_get_max_threads()
To avoid potential errors from allocating too many threads, you can use omp_set_num_threads() with omp_get_max_threads(). This returns the maximum number of threads that can be used to form a new “team” of threads in a parallel region.

#pragma omp parallel
This directive instructs your compiler to create a parallel region within which it instantiates a set (team) of threads to complete the computations within it. Without any further specifications, the parallelization of the code within this block will be automatically determined by the compiler.

#pragma omp for
Use this directive to instruct the compiler to parallelize for-loop iterations within the team of threads that it has instantiated. This directive is often used in combination with #pragma omp parallel to form the #pragma omp parallel for construct which both creates a parallel region and distributes tasks across all threads.

To demonstrate:

#include <stdio.h>
#include <omp.h>
void some_function() {
	#pragma omp parallel {
        #pragma omp for
		for (int i = 0; i < some_num; i++) {
             perform_task_here();
        }
	}
}

is equivalent to

#include <stdio.h>
#include <omp.h>
void some_function() {
	#pragma omp parallel for {
    for (int i = 0; i < some_num; i++) {
          perform_task_here();
     }
}

#pragma omp barrier
This directive marks the start of a synchronization point. A synchronization point can be thought of as a “gate” at which all threads must wait until all remaining threads have completed their individual computations. The threads in the parallel region beyond omp barrier will not be executed until all threads before is have completed all explicit tasks.

#pragma omp single
This is the first of three synchronization directives we will explore in this blog post. This directive identifies a section of code, typically contained within “{ }”, that can only be run with one thread. This directive enforces serial execution, which can improve accuracy but decrease speed. The omp single directive imposes an implicit barrier where all threads after #pragma omp single will not be executed until the single thread running its tasks is done. This barrier can be removed by forming the #pragma omp single nowait construct.

#pragma omp master
This directive is similar to that of the #pragma omp single directive, but chooses only the master thread to run the task (the thread responsible creating, managing and discarding all other threads). Unlike #pragma omp single, it does not have an implicit barrier, resulting in faster implementations. Both #pragma omp single and #pragma omp master will result in running only a section of code once, and it useful for controlling sections in code that require printing statements or indicating signals.

#pragma omp critical
A section of code immediate following this directive can only be executed by one thread at a time. It should not be confused with #pragma omp single. The former requires that the code be executed by one thread at a time, and will be executed as many times as has been set by omp_set_num_threads(). The latter will ensure that its corresponding section of code be executed only once. Use this directive to prevent race conditions and enforce serial execution where one-at-a-time computations are required.

Things you cannot (and shouldn’t) do with OpenMP

As all good things come to an end, all HPC APIs must also have limitations. In this case, here is a quick rundown in the case of OpenMP:

  • OpenMP is limited to function on shared memory machines and should not be run on distributed memory architectures.
  • It also implicitly assumes equal load balance. Incorrectly making this assumption can lead to synchronization delays due to some threads taking significantly longer to complete their tasks compared to others immediately prior to a synchronization directive.
  • OpenMP may have lower parallel efficiency due to it functioning on shared memory architectures, eventually being limited by Amdahl’s law.
  • It relies heavily on parallelizable for-loops instead of atomic operations.

    Summary

    In this post, we introduced OpenMP and a few of its key constructs. We also walked through brief examples of their use cases, as well as limitations of the use of OpenMP. In our next blog post, we will provide a tutorial on how to write, compile, and run a parallel for-loop on Cornell’s Cube Cluster. Until then, feel free to take a tour through the Cornell Virtual Workshop course on OpenMP to get a head start!

    References

    IBM documentation. (n.d.). https://www.ibm.com/docs/en/xl-c-aix/13.1.3

    Jones, M. D. (2009). Practical issues in Openmp. University at Buffalo (UB) Department of Computer Science and Engineering . https://cse.buffalo.edu/~vipin/nsf/docs/Tutorials/OpenMP/omp-II-handout.pdf

    Pros and cons of OpenMP. (n.d.). https://www.dartmouth.edu/~rc/classes/intro_openmp/Pros_and_Cons.html

    What is load balancing? – load balancing algorithm explained – AWS. (n.d.). https://aws.amazon.com/what-is/load-balancing/

    Ziskovin, G. (2022, October 29). #pragma OMP single [explained with example]. OpenGenus IQ: Computing Expertise & Legacy. https://iq.opengenus.org/pragma-omp-single/

    Checkpointing and restoring runs with the Borg MOEA

    Introduction

    Simulation-optimization is an increasingly popular tool in many engineering fields. As the name implies, simulation-optimization requires two major components: a simulation model of the system we want to optimize (e.g., a water supply system) and an algorithm for iteratively tuning the decision levers within the system to optimize the simulated performance. For example, Multi-Objective Evolutionary Algorithms (MOEAs), such as the Borg MOEA, have been successfully employed across a variety of challenging stochastic multi-objective simulation-optimization problems in water resources and other fields. However, simulation-optimization can require millions of iterative simulation trials in order to adequately explore the decision and uncertainty spaces, especially under multi-objective framings. Given the ever-increasing size and complexity of simulation models, this can become a significant computational challenge.

    Running the MOEA in parallel on high-performance computing clusters can greatly alleviate this challenge, but not eliminate it. For example, in my last blog post on hybrid parallelization of the Borg MOEA, I gave the example of optimizing infrastructure investments using a simulation model that takes 10 minutes to run a 30-year realization. Let’s assume we run 48 Monte Carlo realizations per function evaluation (FE) to account for natural hydrologic variability. Even using the maximum job size of 128 nodes on the Stampede2 supercomputer, with 48 CPU each, we will only be about to complete ~20,000 FEs per 24 hours of wall clock. To complete 100,000 FE, a relatively low target compared to other similarly-complex studies, the MOEA will need to run for ~5 days. This raises a major problem: many computing clusters have strict wall clock limits. On Stampede2, for example, each submission can run for a maximum of 48 hours. As a result, we would be limited to ~40,000 FE for a single Stampede2 run, which may be insufficient based on the difficulty of the problem.

    Fortunately, the Borg MOEA’s “checkpointing” and “restoring” functionalities allow us to circumvent this issue. Checkpointing is the process of saving “snapshots” of the MOEA’s internal state at regular intervals, and restoring is the act of reading a snapshot into memory at the beginning of a run and continuing the optimization from that starting point. This allows us to effectively split a 5-day optimization into three sequential submissions, in order to comply with a 2-day wall clock limit. This functionality can also be helpful for iteratively assessing convergence, since we don’t necessarily know a priori how many FE will need to be run.

    In this blog post, I will demonstrate the use of checkpointing and restoring with the Borg MOEA using a less computationally-demanding example: the Python-based DPS Lake Problem. However, the methods and results described here should be applicable to much larger experiments such as the one described above. See my last blog post and references therein for more details on the Python implementation of the DPS Lake Problem.

    Setup

    For this post, I ran the following short proof of concept: first I ran five random seeds of the Borg MOEA for 5 minutes each. Each seed was assigned 2 nodes (16 CPU each) on the Cube cluster at Cornell. Each seed completed approximately 6100 FE in this first round. Next, I ran a second round in which each seed began from the 6100-FE checkpoint and ran for another 5 minutes. All code for this post, along with instructions for downloading the correct branch of the Borg MOEA, can be found in this GitHub repository.

    Setting up checkpointing only takes a few extra lines of code in the file “borg_lake_msmp.py”. First, we need to define the baseline file name used to store the checkpoint files:

    newCheckpointFileBase = 'results/checkpoints/seed' + str(seed) 
    

    Within the simulation, checkpoints will be made at regular intervals. The checkpoint file names will automatically have the FE appended; for example, the checkpoint for seed 5 after 6100 FE is written to “results/checkpoints/seed5_nfe6100.checkpoint”. The write frequency for both checkpoints and runtime metrics can be changed based on the “runtimeFrequency” parameter in “submit_msmp.sh”.

    We can also provide a previous checkpoint to read in at the beginning of the run:

    if maxEvalsPrevious > 0:
        oldCheckpointFile = 'results/checkpoints/seed' + str(seed) + '_nfe' + str(maxEvalsPrevious) + '.checkpoint'

    where “maxEvalsPrevious” is the number of FE for the prior checkpoint that we want to read in. This parameter is input in “submit_msmp.sh”. The previous checkpoint file must be placed in the “checkpoints” folder prior to runtime.

    We provide both checkpoint file names to the Borg MOEA, along with other important parameters such as the maximum number of evaluations.

    result = borg.solveMPI(maxEvaluations=maxEvals, runtime=runtimeFile, frequency=runtimeFreq, newCheckpointFileBase=newCheckpointFileBase, oldCheckpointFile=oldCheckpointFile, evaluationFile=evaluationFile)
    

    The Python wrapper for the Borg MOEA will use this command to initialize and run the MOEA across all available nodes. For more details on the implementation of the experiment, see the GitHub repository above.

    The checkpoint file

    So what information is saved in the checkpoint file? This file includes all internal state variables needed to initialize a new instance of the Borg MOEA that begins where the previous run left off. As an example, I will highlight select information from “seed5_nfe6100.checkpoint”, the last checkpoint file (at 6100 FE) for seed 5 after the first round.

    First, the file lists basic problem formulation information such as the number of objectives, the decision variable bounds, and the epsilon dominance parameters:

    Version: 108000
    Number of Variables: 6
    Number of Objectives: 4
    Number of Constraints: 1
    Lower Bounds: -2 0 0 -2 0 0
    Upper Bounds: 2 2 1 2 2 1
    Epsilons: 0.01 0.01 0.0001 0.0001
    Number of Operators: 6

    Next, it lists current values for several parameters which evolve over the course of the optimization, such as the selection probabilities for the six recombination operators, the number of times the algorithm has triggered a “restart” due to stalled progress, and the number of solutions in the population:

    Operator Selection Probabilities: 0.93103448275862066 0.0086206896551724137 0.017241379310344827 0.025862068965517241 0.0086206896551724137 0.0086206896551724137
    Number of Evaluations: 6100
    Tournament Size: 3
    Window Size: 200
    ...
    Number of Restarts: 0
    ...
    Population Size: 196
    Population Capacity: 196

    The file then lists each of the 196 solutions currently stored in the population:

    Population:
     0.52276605276517329 1.1277550312248441 0.57174902357263913 -0.053873968914707165 1.5491763609435476 0.68045375364276195 -0.2810269062318374 0.18479149938317199 -1 -1 0 0
    ...
     0.27892075855585108 0.45130064043648516 0.77550656566943632 0.31799415686665755 1.0881848609123497 0.60273027430733062 -0.25193459430454329 0.16022123986158276 -0.98989898989898994 -1 0 0
    

    Each line in this section represents a different solution; the numbers represent the 6 decision variables, 4 objectives, and 1 constraints for this problem, as well as the index of the operator used to create each solution.

    Similarly, the file lists the current archive, which is the subset of 64 solutions from the larger population that are non-dominated:

    Archive Size: 64
    Archive:
     0.65953806401594717 1.3878689423124047 0.5978484500419472 0.15387554544946802 1.1931240566680981 0.62731909408989983 -0.32254774259834024 0.21682247784367689 -1 -1 0 0
    ...
     0.28214763078942073 0.44992540083980864 0.77495402511037126 0.1548097106329358 1.0881858398758053 0.6850068814635788 -0.2456560390454271 0.15760082516545762 -0.98989898989898994 -1 0 0
    

    The checkpointing file then lists several parameters associated with the most recently used recombination operators:

    Number of Improvements: 194
    Recency List Size: 50
    Recency List Position: 47
    Recency List: 0 0 0 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
    

    We see that after 6100 FE, the algorithm is almost exclusively using the operator with index 0; this is consistent with operator 0’s 93% selection probability listed earlier in the file.

    Finally, the checkpoint file closes with the current state of the random number generator:

    RNG State Length: 624
    RNG State: 2103772999 2815404811 3893540187 ...
    RNG Index: 200
    Next Gaussian: -0.13978937466755278
    Have Next Gaussian: 0

    Results

    So does it work? Figure 1 shows the convergence of the five seeds according to six different measures of quality for the evolving archive. Performance for all seeds is found to be continuous across the two rounds, suggesting that the checkpointing and restoring is working properly. Additionally, Figure 2 demonstrates the continuity of the auto-adaptive selection probabilities for the Borg MOEA’s different recombination operators.

    Figure 1: Performance of 5 random seeds across 2 rounds, quantified by six different measures of approximate Pareto set quality.
    Figure 2: Adaptive selection probabilities for the Borg MOEA’s six recombination operators.

    The checkpointing process thus allows for continuity of learning across rounds: not only learning about which solutions are most successful (i.e., continuity of the population and archive), but also learning about which strategies for generating new candidate solutions are most successful (i.e., continuity of the recombination operator histories and probabilities). This capability allows for seamlessly sequencing separate “blocks” of function evaluations in situations where wall clock limits are limiting, or where the convergence behavior of the problem is not well understood a priori.

    Easy batch parallelization of code in any language using mpi4py

    The simplest form of parallel computing is what’s known as “embarrassingly” parallel processes. These processes involve fully independent runs of a model or script where little or no communication is needed across parallel processes. A common example is Monte Carlo evaluation, when we run a model over an ensemble of inputs. To parallelize an embarrassingly parallel application we simply need to send a set of commands to the cluster telling it to run each sample on a different core (or set of cores). For small applications, this can be done by submitting each run individually. For larger applications, SLURM Job Arrays (which are nicely detailed in Antonia’s post, here) can efficiently batch large number of function calls to independent computing cores. While this method is efficient and effective, I find it sometimes can be hard to keep track of, as you may be submitting tens or hundreds of jobs at a time. An alternative approach to submitting embarrassingly parallel tasks is to utilize MPI with Python to dispatch and organize jobs.

    I like the MPI / Python combo because it consolidates all parallel applications into a single job, meaning you have one job to keep track of on a cluster at a time, and one output file generated by the batch set of model runs. I also find Python slightly easier to edit and debug than Bash scripts (which are used to create job arrays). Additionally, it’s very easy to assign each computing core a set of function evaluations to run (this can also be done with Job arrays, but again, I find Python easier to work with). Though Python is the language used to coordinate parallel tasks, we can use it to parallelize code in any language, as I’ll demonstrate below.

    In this post I’ll first provide some background on MPI and its Python implementation, mpi4py. Next I’ll provide an example I’ve developed to demonstrate how to batch run a Matlab code on a cluster. The examples presented here are derived from some of Bernardo’s code in his post on Parallel programming in C/C++, which you can find here.

    A very light introduction to MPI

    MPI stands for “Message Passing Interface” and is the standard library for distributed memory parallelization (for background, see this post). To understand how MPI works, it’s helpful to define some of it’s basic components.

    1. Tasks: I’ll use the term task to define a processor (or group of processors) assigned to perform a specific set of instructions. These instructions may by a single evaluation of a function, or a set of function evaluations
    2. Communicators: A communicator is a group of MPI task units that are permitted to communicate with each other. In advanced MPI applications you may have multiple communicators, but for embarrassingly parallel applications we’ll only use one. The default communicator is called “MPI_COMM_WORLD” (I don’t know why, if anyone does please feel free to share in the comments), and that’s what I’ll work with here.
    3. Ranks: Each MPI task is assigned a unique identifier within the communicator called a rank. The processors running each task can access their own rank number, which will play an important role in how we use MPI for embarrassingly parallel applications.

    A example schematic of the MPI_COMM_WORLD communicator with six tasks and their associated ranks is shown below.

    mpi4py

    MPI is implemented in Python with the mpi4py library. When we run an MPI code on a cluster, MPI creates the communicator and assigns each task a rank, then each task unit independently load the script. The processor/s associated with a task can then access their own unique rank.

    The following snip of code loads this library, accesses the communicator and stores the rank of the given process:

    # load the mpi4py library
    from mpi4py import MPI
    
    # access the MPI COMM WORLD communicator and assign it to a variable
    comm = MPI.COMM_WORLD
    
    # get the rank of the current process (different for each process on the cluster)
    rank = comm.Get_rank()
    
    

    Example of using mpi4py to batch parallel jobs

    Here, I’ll parallelize the submission of a Matlab script called demoScript.m. This script reads an input file from a specific file location and prints out the contents of that file. For example purposes I’ve created 20 input files, each in their own folders. The folders are called “input_sample_0”, “input_sample_1” etc.. Each input_sample folder contains a file called “sample_data.txt”, which contains one line of text reading: “This is data for run <sample_number>”.

    All code for this example can be found on Github, here: https://github.com/davidfgold/mpi4py_blog.git

    Batching runs of demoScript.m process involves three components:

    1. Write demoScript.m so that it reads the sample number from the input.
    2. Write a Python script that will use mpi4py to distribute calls of demoScript.m. Here I’ll call this script “callDemoScript.py”
    3. Write a Bash script that sets up your MPI run and calls the Python function. Here I’ll call this script “submitDemoScript.sh”

    1. demoScript.m

    The demo Matlab script is found below. It reads in two arguments that are called from the command line. The first argument is the rank, which will vary for each task, and the second is the sample number, which will specify which input folder to read from.

    %%%%%%%%%%%%%%%%%%%%
    % demoScript.m
    %
    % reads an input file from a given sample number (specified via command line)
    % prints output from the sample file associated with the sample number
    % also prints the rank for demonstration purposes
    %%%%%%%%%%%%%%%%%%%%
    
    % read in command line input
    arg_list = argv();
    rank = arg_list{1,1}; % rank is the first argument
    sample = arg_list{2, 1}; % sample number is the second argument
    
    % Create a string that contains the location of the proper sample directory
    sample_out = fileread(strcat("input_sample_", sample, "/sample_data.txt"));
    
    % create a string to print the rank number
    rank_call = strcat("This is rank_", rank, ", recieving the following input: \n");
    
    % format the output and print
    output = strcat(rank_call, sample_out);
    fprintf(output)
    
    

    2. callDemoScript.py

    The second component is a Python script that uses mpi4py to call demoScript.m many times across different tasks. Each task will run a number of samples equal to a variable called “N_SAMPLES_PER_TASK” which will be fed to this script when it is called.

    '''
    callDemoScript.py
    
    Called to batch demoScript.m across multiple MPI tasks
    
    Reads in the total tasks and number of samples per task from command line.
    '''
    # load necessary libraries
    from mpi4py import MPI
    import numpy as np
    import sys
    import os
    import time
    
    # locate the COMM WORLD communicator
    comm = MPI.COMM_WORLD
    
    # get the number of the current rank
    rank = comm.Get_rank()
    
    # read in arguments from the submission script
    TOTAL_TASKS = int(sys.argv[1]) # number of MPI processes
    N_SAMPLES_PER_TASK = int(sys.argv[2]) # number of runs per/task
    
    # loop through samples assigned to current rank
    for i in range(N_SAMPLES_PER_TASK):
    	sample= rank + TOTAL_TASKS * i
    
    	# write the command that will be sent to the terminal (here RUN will replace the {})
    	terminal_command = "octave-cli ./demoScript.m {} {} ".format(rank, sample)
    
    	# write the terminal command to the process
    	os.system(terminal_command)
    
    	# sleep before submitting the next command
    	time.sleep(1) # optional, for memory intensive submissions
    
    comm.Barrier()
    
    

    submitDemoscript.sh

    The final component is a Bash script that will send this MPI job to the cluster. Here I’ll use SLURM to create 4 MPI tasks across 2 Nodes (each node will have 2 associated task). This will create a total of 4 MPI tasks, and each task will be assigned 5 samples to run.

    I wrote this for a local cluster at Cornell, note that I had to load two modules to run Python and a third to run Octave (which is used to call Matlab scripts on Linux). I’ll call the Python script with mpirun, and then specify the total number of MPI tasks before making the function call. The output of the script is printed to a text file called demoOutput.txt

    # Set up your parallel runs
    SAMPLES_PER_TASK=5 # number of runs for each MPI task
    N_NODES=2 # number of nodes
    TASKS_PER_NODE=2 # number of tasks per node
    
    TOTAL_TASKS=$(($N_NODES*$TASKS_PER_NODE)) # total number of tasks
    
    # Submit the parallel job
    #!/bin/bash
    #SBATCH -n $(TOTAL_TASKS) -N $(N_NODES)
    #SBATCH --time=0:01:00
    #SBATCH --job-name=demoMPI4py
    #SBATCH --output=output/demo.out
    #SBATCH --error=output/demo.err
    #SBATCH --exclusive
    module load py3-mpi4py
    module load py3-numpy
    module load octave/6.3.0
    
    mpirun -np $TOTAL_TASKS python3 callDemoScript.py $TOTAL_TASKS $SAMPLES_PER_TASK > demoOutput.txt
    
    

    Additional resources

    Putting some thought into how you design a set of parallel runs can save you a lot of time and headache. The example above has worked well for me when submitting sets of embarrassingly parallel tasks, but each application will be different, so take the time to find the procedure that works best for you. Our blog and the internet are full of resources that can help you parallelize your code, below are some suggestions:

    Performing Experiments on HPC Systems

    Scaling experiments: how to measure the performance of parallel code on HPC systems

    Parallel processing with R on Windows

    How to automate scripts on a cluster

    Parallelization of C/C++ and Python on Clusters

    Developing parallelised code with MPI for dummies, in C (Part 1/2)

    Cornell CAC glossery on HPC terms: https://cvw.cac.cornell.edu/main/glossary

    A great MPI tutorial I found online: https://mpitutorial.com/tutorials/

    A non-intimidating introduction to parallel computing with Numba

    This blog post is adapted from material I learned during the 2021 San Diego Supercomputer Center (SDSC) Summer Institute. This was an introductory boot camp to high-performance computing (HPC), and one of the modules taught the application of Numba for in-line parallelization and speeding up of Python code.

    What is Numba?

    According to its official web page, Numba is a just-in-time (JIT) compiler that translates subsets of Python and NumPy code into fast machine code, enabling it to run at speeds approaching that of C or Fortran. This is becuase JIT compilation enables specific lines of code to be compiled or activated only when necessary. Numba also makes use of cache memory to generate and store the compiled version of all data types entered to a specific function, which eliminates the need for recompilation every time the same data type is called when a function is run.

    This blog post will demonstrate a simple examples of using Numba and its most commonly-used decorator, @jit, via Jupyter Notebook. The Binder file containing all the executable code can be found here.

    Note: The ‘@‘ flag is used to indicate the use of a decorator

    Installing Numba and Setting up the Jupyter Notebook

    First, in your command prompt, enter:

    pip install numba

    Alternatively, you can also use:

    conda install numba

    Next, import Numba:

    import numpy as np
    import numba
    from numba import jit
    from numba import vectorize

    Great! Now let’s move onto using the @jit decorator.

    Using @jit for executing functions on the CPU

    The @jit decorator works best on numerical functions that use NumPy. It has two modes: nopython mode and object mode. Setting nopython=True tell the compiler to overlook the involvement of the Python interpreter when running the entire decorated function. This setting leads to the best performance. However, in the case when:

    1. nopython=True fails
    2. nopython=False, or
    3. nopython is not set at all

    the compiler defaults to object mode. Then, Numba will manually identify loops that it can compile into functions to be run in machine code, and will run the remaining code in the interpreter.

    Here, @jit is demonstrated on a simple matrix multiplication function:

    # a function that does multiple matrix multiplication
    @jit(nopython=True)
    def matrix_multiplication(A, x):
        b = np.empty(shape=(x.shape[0],1), dtype=np.float64)
        for i in range(x.shape[0]):
            b[i] = np.dot(A[i,:], x)
        return b

    Remember – the use of @jit means that this function has not been compiled yet! Compilation only happens when you call the function:

    A = np.random.rand(10, 10)
    x = np.random.rand(10, 1)
    a_complicated_function(A,x)

    But how much faster is Numba really? To find out, some benchmarking is in order. Jupyter Notebook has a handy function called %timeit that runs simple functions many times in a loop to get their average execution time, that can be used as follows:

    %timeit matrix_multiplication(A,x)
    
    # 11.4 µs ± 7.34 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

    Numba has a special .py_func attribute that effectively allows the decorated function to run as the original uncompiled Python function. Using this to compare its runtime to that of the decorated version,

    %timeit matrix_multiplication.py_func(A,x)
    
    # 35.5 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

    From here, you can see that the Numba version runs about 3 times faster than using only NumPy arrays. In addition to this, Numba also supports tuples, integers, floats, and Python lists. All other Python features supported by Numba can be found here.

    Besides explicitly declaring @jit at the start of a function, Numba makes it simple to turn a NumPy function into a Numba function by attaching jit(nopython=True) to the original function. This essentially uses the @jit decorator as a function. The function to calculate absolute percentage relative error demonstrates how this is done:

    # Calculate percentage relative error
    def numpy_re(x, true):
        return np.abs(((x - true)/true))*100
    
    numba_re = jit(nopython=True)(numpy_re)

    And we can see how the Number version is faster:

    %timeit numpy_re(x, 0.66)
    %timeit numba_re(x, 0.66)

    where the NumPy version takes approximately 2.61 microseconds to run, while the Numba version takes 687 nanoseconds.

    Inline parallelization with Numba

    The @jit decorator can also be used to enable inline parallelization by setting its parallelization pass parallel=True. Parallelization in Numba is done via multi-threading, which essentially creates threads of code that are distributed over all the available CPU cores. An example of this can be seen in the code snippet below, describing a function that calculates the normal distribution of a set of data with a given mean and standard deviation:

    SQRT_2PI = np.sqrt(2 * np.pi)
    
    @jit(nopython=True, parallel=True)
    def normals(x, means, sds):
        n = means.shape[0]
        result = np.exp(-0.5*((x - means)/sds)**2)
        return (1 / (sds * np.sqrt(2*np.pi))) * result

    As usual, the function must be compiled:

    means = np.random.uniform(-1,1, size=10**8)
    sds = np.random.uniform(0.1, 0.2, size=10**8)
    
    normals(0.6, means, sds)

    To appreciate the speed-up that Numba’s multi-threading provides, compare the runtime for this with:

    1. A decorated version of the function with a disabled parallel pass
    2. The uncompiled, original NumPy function

    The first example can be timed by:

    normals_deco_nothread = jit(nopython=True)(normals.py_func)
    %timeit normals_deco_nothread(0.6, means, sds)
    
    # 3.24 s ± 757 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    The first line of the code snippet first makes an uncompiled copy of the normals function, and then applies the @jit decorator to it. This effectively creates a version of normals that uses @jit, but is not multi-threaded. This run of the function took approximately 3.3 seconds.

    For the second example, simply:

    %timeit normals.py_func(0.6, means, sds)
    
    # 7.38 s ± 759 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    Now, compare both these examples to the runtime of the decorated and multi-threaded normals function:

    %timeit normals(0.6, means, sds)
    
    # 933 ms ± 155 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    The decorated, multi-threaded function is significantly faster (933 ms) than the decorated function without multi-threading (3.24 s), which in turn is faster than the uncompiled original NumPy function (7.38 s). However, the degree of speed-up may vary depending on the number of CPUs that the machine has available.

    Summary

    In general, the improvements achieved by using Numba on top of NumPy functions are marginal for simple, few-loop functions. Nevertheless, Numba is particularly useful for large datasets or high-dimensional arrays that require a large number of loops, and would benefit from the one-and-done compilation that it enables. For more information on using Numba, please refer to its official web page.

    Simple profiling checks for running jobs on clusters

    The goal of this short blog post is to share some simple tips on profiling your (to be) submitted jobs on high performance computing resources. Profiling your jobs can give you information about how efficiently you are using your computational resources, i.e., your CPUs and your allocated memory. Typically you would perform these checks on your experiment at a smaller scale, ensuring that everything is working as it should, before expanding to more tasks and CPUs.

    Your first check is squeue typically paired with your user ID on a cluster. Here’s an example:

    (base) [ah986@login02 project_dir]$ squeue -u ah986
                 JOBID PARTITION     NAME      USER  ST       TIME  NODES NODELIST(REASON) 
               5688212    shared <job_name>    ah986  R       0:05      1 exp-4-55 
    

    This tells me that my submitted job is utilizing 1 node in the shared partition of this cluster. If your cluster is using the SLURM scheduler, you can also use sacct which can display accounting data for all jobs you are currently running or have run in the past. There’s many pieces of information available with sacct, that you can specify using the --format flag. Here’s an example for the same job:

    (base) [ah986@login02 project_dir]$ sacct --format=JobID,partition,state,time,start,end,elapsed,nnodes,ncpus,nodelist,AllocTRES%32 -j 5688212
           JobID  Partition      State  Timelimit               Start                 End    Elapsed   NNodes      NCPUS        NodeList                        AllocTRES 
    ------------ ---------- ---------- ---------- ------------------- ------------------- ---------- -------- ---------- --------------- -------------------------------- 
    5688212          shared    RUNNING   20:00:00 2021-09-08T10:55:40             Unknown   00:19:47        1        100        exp-4-55 billing=360000,cpu=100,mem=200G+ 
    5688212.bat+               RUNNING            2021-09-08T10:55:40             Unknown   00:19:47        1        100        exp-4-55          cpu=100,mem=200G,node=1 
    5688212.0                  RUNNING            2021-09-08T10:55:40             Unknown   00:19:47        1        100        exp-4-55          cpu=100,mem=200G,node=1 
    

    In this case I can see the number of nodes (1) and the number of cores (100) utilized by my job as well as the resources allocated to it (100 CPUs and 200G of memory on 1 node). This information is useful in cases where a task launches other tasks and you’d like to diagnose whether the correct number of cores is being used.

    Another useful tool is seff, which is actually a wrapper around sacct and summarizes your job’s overall performance. It is a little unreliable while the job is still running, but after the job is finished you can run:

    (base) [ah986@login02 project_dir]$ seff 5688212
    Job ID: 5688212
    Cluster: expanse
    User/Group: ah986/pen110
    State: COMPLETED (exit code 0)
    Nodes: 1
    Cores per node: 100
    CPU Utilized: 1-01:59:46
    CPU Efficiency: 68.16% of 1-14:08:20 core-walltime
    Job Wall-clock time: 00:22:53
    Memory Utilized: 38.25 GB
    Memory Efficiency: 19.13% of 200.00 GB
    

    The information here is very useful if you want to find out about how efficiently you’re using your resources. For this example I had 100 separate tasks I needed to perform and I requested 100 cores on 1 node and 200 GB of memory. These results tell me that my job completed in 23mins or so, the total time using the CPUs (CPU Utilized) was 01:59:46, and most importantly, the efficiency of my CPU use. CPU Efficiency is calculated “as the ratio of the actual core time from all cores divided by the number of cores requested divided by the run time”, in this case 68.16%. What this means it that I could be utilizing my cores more efficiently by allocating fewer cores to the same number of tasks, especially if scaling up to a larger number of nodes/cores. Additionally, my allocated memory is underutilized and I could be requesting a smaller memory allocation without inhibiting my runs.

    Finally, while your job is still running you can log in the node(s) executing the job to look at live data. To do so, you simply ssh to one of the nodes listed under NODELIST (not all clusters allow this). From there, you can run the top command like below (with your own username), which will start the live task manager:

    (base) [ah986@r143 ~]$ top -u ah986
    
    top - 15:17:34 up 25 days, 19:55,  1 user,  load average: 0.09, 12.62, 40.64
    Tasks: 1727 total,   2 running, 1725 sleeping,   0 stopped,   0 zombie
    %Cpu(s):  0.3 us,  0.1 sy,  0.0 ni, 99.6 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
    MiB Mem : 257662.9 total, 249783.4 free,   5561.6 used,   2317.9 buff/cache
    MiB Swap: 716287.0 total, 716005.8 free,    281.2 used. 250321.1 avail Mem 
    
       PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                              
     78985 ah986     20   0  276212   7068   4320 R   0.3   0.0   0:00.62 top                                                                                                                                  
     78229 ah986     20   0  222624   3352   2936 S   0.0   0.0   0:00.00 slurm_script                                                                                                                         
     78467 ah986     20   0  259464   8128   4712 S   0.0   0.0   0:00.00 srun                                                                                                                                 
     78468 ah986     20   0   54520    836      0 S   0.0   0.0   0:00.00 srun                                                                                                                                 
     78481 ah986     20   0  266404  19112   4704 S   0.0   0.0   0:00.24 parallel                                                                                                                             
     78592 ah986     20   0  217052    792    720 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78593 ah986     20   0  217052    732    660 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78594 ah986     20   0  217052    764    692 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78595 ah986     20   0  217052    708    636 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78596 ah986     20   0  217052    708    636 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78597 ah986     20   0  217052    796    728 S   0.0   0.0   0:00.00 sleep                                                                                                                                
     78598 ah986     20   0  217052    732    660 S   0.0   0.0   0:00.00 sleep       
    

    Memory and CPU usage can be tracked from RES and %CPU columns respectively. In this case, for the sake of an example, I just assigned all my cores to sleep a certain number of minutes each (using no CPU or memory). Similar information can also be obtained using the ps command, with memory being tracked under the RSS column.

     (base) [ah986@r143 ~]$ ps -u$USER -o %cpu,rss,args
    %CPU   RSS COMMAND
     0.0  3352 /bin/bash /var/spool/slurm/d/job3509431/slurm_script
     0.0  8128 srun --export=all --exclusive -N1 -n1 parallel -j 100 sleep {}m ::: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
     0.0   836 srun --export=all --exclusive -N1 -n1 parallel -j 100 sleep {}m ::: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
     0.1 19112 /usr/bin/perl /usr/bin/parallel -j 100 sleep {}m ::: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
     0.0   792 sleep 3m
     0.0   732 sleep 4m
     0.0   764 sleep 5m
     0.0   708 sleep 6m
     0.0   708 sleep 7m
     0.0   796 sleep 8m
     0.0   732 sleep 9m
     0.0   712 sleep 10m
    

    Measuring the parallel performance of the Borg MOEA

    In most applications, parallel computing is used to improve code efficiency and expand the scope of problems that can be addressed computationally (for background on parallelization, see references listed at the bottom of this post). For the Borg Many Objective Evolutionary Algorithm (MOEA) however, parallelization can also improve the quality and reliability of many objective search by enabling a multi-population search. The multi-population implementation of Borg is known as Multi-Master Borg, details can be found here. To measure the performance of Multi-Master Borg, we need to go beyond basic parallel efficiency (discussed in my last post, here), which measures the efficiency of computation but not the quality of the many objective search. In this post, I’ll discuss how we measure the performance of Multi-Master Borg using two metrics: hypervolume speedup and reliability.

    Hypervolume speedup

    In my last post, I discussed traditional parallel efficiency, which measures the improvement in speed and efficiency that can be achieved through parallelization. For many objective search, speed and efficiency of computation are important, but we are more interested in the speed and efficiency with which the algorithm produces high quality solutions. We often use the hypervolume metric to measure the quality of an approximation set as it captures both convergence and diversity (for a thorough explanation of hypervolume, see this post). Using hypervolume as a measure of search quality, we can then evaluate hypervolume speedup, defined as:

    Hypervolume speedup = \frac{T_S^H}{T_P^H}

    where T_S^H is the time it takes the serial version of the MOEA to achieve a given hypervolume threshold, and T_P^H is the time it takes the parallel implementation to reach the same threshold. Figure 1 below, adapted from Hadka and Reed, (2014), shows the hypervolume speedup across different parallel implementations of the Borg MOEA for the five objective NSGA II test problem run on 16,384 processors (in this work the parallel epsilon-NSGA II algorithm is used as a baseline rather than a serial implementation). Results from Figure 1 reveal that the Multi-Master implementations of Borg are able to reach each hypervolume threshold faster than the baseline algorithm and the master-worker implementation. For high hypervolume thresholds, the 16 and 32 Master implementations achieve the hypervolume thresholds 10 times faster than the baseline.

    Figure 1: Hypervolume speedup for the five objective LRGV test problem across implementations of the Borg MOEA (epsilon NSGA-II, another algorithm) is used as the baseline here). This figure is adapted from Hadka and Reed, (2014).

    Reliability

    MOEAs are inherently stochastic algorithms, they are be initialized with random seeds which may speedup or slow down the efficiency of the search process. To ensure high quality Pareto approximate sets, it’s standard practice to run an MOEA across many random seeds and extract the best solutions across all seeds as the final solution set. Reliability is a measure of the probability that each seed will achieve a high quality set of solutions. Algorithms that have higher reliability allow users to run fewer random seeds which saves computational resources and speeds up the search process. Salazar et al., (2017) examined the performance of 17 configurations of Borg on the Lower Susquehanna River Basin (LSRB) for a fixed 10 hour runtime. Figure 2 shows the performance of each configuration across 50 random seeds. A configuration that is able to achieve the best hypervolume across all seeds would be represented as a blue bar that extends to the top of the plot. The algorithmic configurations are shown in the plot to the right. These results show that though configuration D, which has a high core count and low master count, achieves the best overall hypervolume, it does not do so reliably. Configuration H, which has two masters, is able to achieve nearly the same hypervolume, but has a much higher reliability. Configuration L, which has four masters, achieves a lower maximum hypervolume, but has vary little variance across random seeds.

    Figure 2: Reliability of search adapted from Salazar et al., (2017). Each letter represents a different algorithmic configuration (shown right) for the many objective LSRB problem across 10 hours of runtime. The color represents the probability that each configuration was able to attain a given level of hypervolume across 50 seeds.

    These results can be further examined by looking at the quality of search across its runtime. In Figure 3, Salazar et al. (2017) compare the performance of the three algorithmic configurations highlighted above (D, H and L). The hypervolume achieved by the maximum and minimum seeds are shown in the shaded areas, and the median hypervolume is shown with each line. Figure 3 clearly demonstrates how improved reliability can enhance search. Though the Multi-Master implementation is able to perform fewer function evaluations in the 10 hour runtime, it has very low variance across seeds. The Master-worker implementation on the other hand achieves better performance with it’s best seed (it gets lucky), but its median performance never achieves the hypervolume of the two or four master configurations.

    Figure 3: Runtime hypervolume dynamics for the LSRB problem by Salazar et al., (2017). The reduction in variance in the Multi-Master implementations of Borg demonstrate the benefits of improved reliability.

    Concluding thoughts

    The two measures introduced above allow us to quantify the benefits of parallelizing the Multi-Master Borg MOEA. The improvements to search quality not only allow us to reduce the time and resources that we need to expend on many objective search, but may also allow us to discover solutions that would be missed by the serial or Master-Worker implementations of the algorithm. In many objective optimization contexts, this improvement may fundamentally alter our understanding of what is possible in a challenging environmental optimization problems.

    Parallel computing resources

    References

    Hadka, D., & Reed, P. (2015). Large-scale parallelization of the Borg multiobjective evolutionary algorithm to enhance the management of complex environmental systems. Environmental Modelling & Software, 69, 353-369.

    Salazar, J. Z., Reed, P. M., Quinn, J. D., Giuliani, M., & Castelletti, A. (2017). Balancing exploration, uncertainty and computational demands in many objective reservoir optimization. Advances in water resources, 109, 196-210.

    How to schedule massively parallel jobs on clusters – some basic ways

    Massively (or embarrassingly) parallel are processes that are either completely separate or can easily be made to be. This can be cases where tasks don’t need to pass information from one to another (they don’t share memory) and can be executed independently of another on whatever resources are available, for example, large Monte Carlo runs, each representing different sets of model parameters.

    There isn’t any guidance on how to do this on the blog, besides an older post on how to do it using PBS, but most of our current resources use SLURM. So I am going to show two ways: a) using SLURM job arrays; and b) using the GNU parallel module. Both methods allow for tasks to be distributed across multiple cores and across multiple nodes. In terms of how it affects your workflow, the main difference between the two is that GNU parallel allows you to automatically resume/rerun a task that has failed, whereas using SLURM job arrays you have to resubmit the failed tasks manually.

    Your first step using either method is to configure the function representing each task to be able to receive as arguments a task id. For example, if I would like to run my model over 100 parameter combinations, I would have to create my model function as function_that_executes_model(sample=i, [other_arguments]), where the sample number i would correspond to one of my parameter combinations and the respective task to be submitted.

    For python, this function needs to be contained within a .py script which will be executing this function when called. Your .py script could look like this, using argparse to parse the function arguments but there are alternatives:

    import argparse
    import ...
    
    other_arg1= 1
    other_arg2= 'model'
    
    def function_that_executes_model(sample=i, other_arg1, other_arg2):
        #do stuff pertaining to sample i
        return
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='This function executes the model with a sample number')
        parser.add_argument('i', type=int,
                            help='sample number')
        args = parser.parse_args()
        function_that_executes_model(args.i)
    

    To submit this script (say you saved it as function_executor.py) using SLURM job arrays:

    #!/bin/bash
    #SBATCH --partition=compute   # change to your own cluster partition
    #SBATCH --cpus-per-task       # number of cores for each task
    #SBATCH -t 0:45:00            # max wallclock time
    #SBATCH --array=1-100         # array of tasks to execute
    
    module load python
    srun python3 function_executor.py $SLURM_ARRAY_TASK_ID
    

    This will submit 100 1-core jobs to the cluster’s scheduler, and in your queue they will be listed as JOB_ID-TASK_ID.

    Alternatively, you can use your cluster’s GNU parallel module to submit this like so:

    #!/bin/bash
    #SBATCH --partition=compute
    #SBATCH --ntasks=100
    #SBATCH --time=00:45:00
    
    module load parallel
    module load python
    
    # This specifies the options used to run srun. The "-N1 -n1" options are
    # used to allocates a single core to each task.
    srun="srun --export=all --exclusive -N1 -n1"
    
    # This specifies the options used to run GNU parallel:
    #
    #   -j is the number of tasks run simultaneously.
    
    parallel="parallel -j $SLURM_NTASKS"
    
    $parallel "$srun python3 function_executor.py" ::: {1..100}
    
    

    This will instead submit 1 100-core job where each core executes one task. GNU parallel also allows for several additional options that I find useful, like the use of a log to track task execution (--joblog runtask.log) and --resume which will identify the last unfinished task and resume from there the next time you submit this script.

    Scaling experiments: how to measure the performance of parallel code on HPC systems

    Parallel computing allows us to speed up code by performing multiple tasks simultaneously across a distributed set of processors. On high performance computing (HPC) systems, an efficient parallel code can accomplish in minutes what might take days or even years to perform on a single processor. Not all code will scale well on HPC systems however. Most code has inherently serial components that cannot be divided among processors. If the serial component is a large segment of a code, the speedup gained from parallelization will greatly diminish. Memory and communication bottlenecks also present challenges to parallelization, and their impact on performance may not be readily apparent.

    To measure the parallel performance of a code, we perform scaling experiments. Scaling experiments are useful as 1) a diagnostic tool to analyze performance of your code and 2) a evidence of code performance that can be used when requesting allocations on HPC systems (for example, NSF’s XSEDE program requires scaling information when requesting allocations). In this post I’ll outline the basics for performing scaling analysis of your code and discuss how these results are often presented in allocation applications.

    Amdahl’s law and strong scaling

    One way to measure the performance a parallel code is through what is known as “speedup” which measures the ratio of computational time in serial to the time in parallel:

    speedup = \frac{t_s}{t_p}

    Where t_s is the serial time and t_p is the parallel time.

    The maximum speedup of any code is limited the portion of code that is inherently serial. In the 1960’s programmer Gene Amdahl formalized this limitation by presenting what is now known as Amdahl’s law:

    Speedup = \frac{t_s}{t_p} = \frac{1}{s+(1-s)/p} < \frac{1}{s}

    Where p is the number of processors, and s is the fraction of work that is serial.

    On it’s face, Amdahl’s law seems like a severe limitation for parallel performance. If just 10% of your code is inherently serial, then the maximum speedup you can achieve is a factor of 10 ( s= 0.10, 1/.1 = 10). This means that even if you run your code over 1,000 processors, the code will only run 10 times faster (so there is no reason to run across more than 10 processors). Luckily, in water resources applications the inherently serial fraction of many codes is very small (think ensemble model runs or MOEA function evaluations).

    Experiments that measure speedup of parallel code are known as “strong scaling” experiments. To perform a strong scaling experiment, you fix the amount of work for the code to do (ie. run 10,000 MOEA function evaluations) and examine how long it takes to finish with varying processor counts. Ideally, your speedup will increase linearly with the number of processors. Agencies that grant HPC allocations (like NSF XSEDE) like to see the results of strong scaling experiments visually. Below, I’ve adapted a figure from an XSEDE training on how to assess performance and scaling:

    Plots like this are easy for funding agencies to assess. Good scaling performance can be observed in the lower left corner of the plot, where the speedup increases linearly with the number of processors. When the speedup starts to decrease, but has not leveled off, the scaling is likely acceptable. The peak of the curve represents poor scaling. Note that this will actually be the fastest runtime, but does not represent an efficient use of the parallel system.

    Gustafson’s law and weak scaling

    Many codes will not show acceptable scaling performance when analyzed with strong scaling due to inherently serial sections of code. While this is obviously not a desirable attribute, it does not necessarily mean that parallelization is useless. An alternative measure of parallel performance is to measure the amount of additional work that can be completed when you increase the number of processors. For example, if you have a model that needs to read a large amount of input data, the code may perform poorly if you only run it for a short simulation, but much better if you run a long simulation.

    In the 1980s, John Gustafson proposed a relationship that notes relates the parallel performance to the amount of work a code can accomplish. This relationship has since been termed Gustafson’s law:

    speedup = s+p*N

    Where s and p are once again the portions of the code that are serial and parallel respectively and N is the number of core.

    Gustafson’s law removes the inherent limits from serial sections of code and allows for another type of scaling analysis, termed “weak scaling”. Weak scaling is often measured by “efficiency” rather than speedup. Efficiency is calculated by proportionally scaling the amount of work with the number of processors and measure the ratio of completion times:

    efficiency = \frac{t_1}{t_N}

    Ideally, efficiency will be close to one (the time it take one processor to do one unit of work is the same time it takes N processors to do N units of work). For resource allocations, it is again advisable to visualize the results of weak scaling experiments by creating plots like the one shown below (again adapted from the XSEDE training).

    Final thoughts

    Scaling experiments will help you understand how your code will scale and give you a realistic idea of computation requirements for large experiments. Unfortunately however, it will not diagnose the source of poor scaling. To improve scaling performance, it often helps to improve the serial version of your code as much as possible. A helpful first step is to profile your code. Other useful tips are to reduce the frequency of data input/output and (if using compiled code) to check the flags on your compiler (see some other tips here).

    Automate remote tasks with Paramiko

    This is a short blogpost to demonstrate a the Paramiko Python package. Paramiko allows you to establish SSH, SCP or SFTP connections within Python scripts, which is handy when you’d like to automate some repetitive tasks with on remote server or cluster from your local machine or another cluster you’re running from.

    It is often used for server management tasks, but for research applications you could consider situations where we have a large dataset stored at a remote location and are executing a script that needs to transfer some of that data depending on results or new information. Instead of manually establishing SSH or SFTP connections, those processes could be wrapped and automated within your existing Python script.

    To begin a connection, all you need is a couple lines:

    import paramiko
    
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh_client.connect(hostname='remotehose',username='yourusername',password='yourpassword')
    

    The first line creates a paramiko SSH client object. The second line tells paramiko what to do if the host is not a known host (i.e., whether this host should be trusted or not)—think of when you’re setting up an SSH connection for the first time and get the message:

    The authenticity of host ‘name’ can’t be established. RSA key fingerprint is ‘gibberish’. Are you sure you want to continue connecting (yes/no)?

    The third line is what makes the connection, the hostname, username and password are usually the only necessary things to define.

    Once a connection is established, commands can be executed with exec_command(), which creates three objects:

    stdin, stdout, stderr = ssh_client.exec_command("ls")
    

    stdin is write-only file which can be used for commands requiring input, stdout contains the output of the command, and stderr contains any errors produced by the command—if there are no errors it will be empty.

    To print out what’s returned by the command, use can use stdout.readlines(). To add inputs to stdin, you can do so by using the write() function:

    stdin, stdout, stderr = ssh.exec_command(“sudo ls”)
    stdin.write(‘password\n’)
    

    Importantly: don’t forget to close your connection, especially if this is an automated script that opens many of them: ssh_client.close().

    To transfer files, you need to establish an SFTP or an SCP connection, in a pretty much similar manner:

    ftp_client=ssh_client.open_sftp()
    ftp_client.get('/remote/path/to/file/filename','/local/path/to/file/filename')
    ftp_client.close()
    

    get() will transfer a file to a local directory, put(), used in the same way, will transfer a file to a remote directory.