Introduction to mpi4py

If you like parallel computing, and you like Python, chances are you ‘ll like mpi4py. It’s a Python package that provides MPI bindings, which is very helpful if you’re trying to pass arrays or dictionaries between processors without worrying about the details as you would in a lower level language. Installation can be tricky, but we already have mpi4py installed on the Cube cluster for those of you who have accounts.

Getting started is easy:

from mpi4py import MPI
print "Hello from rank %d out of %d !" % (comm.rank, comm.size)
comm.Barrier() # wait to sync here (not needed for this example)

Then in your submission script:

#PBS -l walltime=1:00:00
#PBS -l nodes=4:ppn=16
#PBS -j oe

mpirun python

And the output:

Hello from rank 12 out of 64 !
Hello from rank 14 out of 64 !
Hello from rank 13 out of 64 !
Hello from rank 51 out of 64 !
Hello from rank 52 out of 64 !

Let’s try something more interesting. In this example we’ll do a set of parallel runs of a very simple linear rainfall-runoff model (one bucket, where dS/dt = -kS). The only free parameter is k, so we’ll sample a range of values between (0,1) based on the rank of each node.

(Aside: this will be an example of a model built on top of stockflow, which is a Python wrapper for solving system dynamics ODEs. You can read more about the linear reservoir example in this notebook.)


from __future__ import division
from stockflow import simulation
import numpy as np
from mpi4py import MPI


Set up the model, with only one state variable (or “stock”):

# Model setup - linear reservoir
tmin = 0
tmax = 365
dt = 1
t = np.arange(tmin,tmax,dt)

data = np.loadtxt('leaf-river-data.txt', skiprows=2)
data_P = data[tmin:tmax,0]
data_PET = data[tmin:tmax,1]
data_Q = data[tmin:tmax,2]

s = simulation(t)
s.stocks({'S': 0})

Based on the processor’s rank, assign a value of the parameter k:

k = (comm.rank+1)/comm.size

Define flows, run the model, and calculate RMSE:

# Flows: precip, ET, and streamflow
s.flow('P', start=None, end='S', f=lambda t: data_P[t])
s.flow('ET', start='S', end=None, f=lambda t: min(data_PET[t], s.S))
s.flow('Q', start='S', end=None, f=lambda t: k*s.S)

RMSE = np.sqrt(np.mean((s.Q-data_Q)**2))

At this point we have completed model results sitting on every processor, but we’d like to collect them all on the root node to do some analysis, for example to find the best RMSE value. We can do this with MPI’s gather operation. There is a great beginner tutorial here describing the basic MPI operations.

Qs = comm.gather(s.Q, root=0)
RMSEs = comm.gather(RMSE, root=0)
ks = comm.gather(k, root=0)

We’ve now collected all of the results onto the root node (0). Let’s find the best and worst values of k and print them.

if comm.rank==0:
  best = np.argmin(RMSEs)
  worst = np.argmax(RMSEs)
  print "best k = %f" % ks[best]
  print "best rmse = %f" % RMSEs[best]
  print "worst k = %f" % ks[worst]
  print "worst rmse = %f" % RMSEs[worst]

Pretty simple! If we run this with the following job script:

#PBS -l walltime=1:00:00
#PBS -l nodes=16:ppn=16
#PBS -j oe


mpirun python

… it runs on 256 processors almost instantly, and we see this output:

best k = 0.183594
best rmse = 2.439382
worst k = 1.000000
worst rmse = 4.470532

Then we can come back locally and plot the best and worst hydrographs to see what they look like:


This looks a little messy, but you can see that the “best” model run matches the observed data much more closely than the “worst”, which overshoots the peaks considerably.

That’s all for now. Happy HPCing!


3 thoughts on “Introduction to mpi4py

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s