Speed up your Python code with basic shared memory parallelization

Part 1 – The simplest case

In many situations we have for-loops in our Python codes in which the iteration i+1 does not depend on iteration i. In these situations, chances are your code can be easily modified for smaller run time by making it use multiple cores of your computer to calculate different iterations at the same time. What will be presented here works great for codes to be run on a desktops or on individual nodes in a cluster (different nodes do not share a memory).

Suppose you ran 3,000 simulations with a given model and each simulation returns a csv file with 24 columns and 10,000 values per column. Suppose that for each file you want to get the summation of all values in each column. One approach would be to use a code like the following one:

from glob import glob
from numpy import genfromtxt

list_of_files = glob("*.csv")

results = []
for f in list_of_files:
    data = genfromtxt(f, delimiter=' ')
    data_transposed = [list(x) for x in zip(*data)] # transposed data
    totals = [sum(line) for line in data_transposed] # sums all lines (former columns, vector of length 24)
    results.append(totals)

print results

This code works great but it takes 11 minutes to run on a Intel® Xeon(R) CPU E5-1620 v2 @ 3.70GHz × 8 (8 cores, 16 with hyper-threading). A way of making this code run faster is to parallelize the for-loop with the Pool class from the multiprocessing Python library.

A Pool object creates a pool of workers (threads) to handle different parts (in our case, files) of the for-loop. A Pool object uses the map(function, input_list) method to distribute the work among all workers and, when all workers are done, it merges the results in a python list in the right order, as if you had ran the for-loop above.

Here is the basic structure of a code that was modified to use the Pool class:

  1. The first step is to create a function based on the code inside the for-loop to be called by the map method.
  2. A pool object is then created with the desired number of threads (between 1 and 2 times the number of cores)
  3. The pool object distributes the work among the workers by making each one of them call the function with a different part of the input list and puts the returned values together in a single list.

The parallelized code would then look like this:

from glob import glob
from numpy import genfromtxt
from multiprocessing import Pool

def process_file(f): 
    data = genfromtxt(f, delimiter=' ') 
    data_transposed = [list(x) for x in zip(*data)] # transposed 
    data totals = [sum(line) for line in data_transposed] # sums all lines (former columns, vector of length 24) 
    return totals

list_of_files = glob("*.rdm")

p = Pool(16) # create a pool of 16 workers

results = p.map(process_file, list_of_files) # perform the calculations

print results 

The code above too 3 minutes to run, as opposed to the 11 minutes taken by its serial version.

Part 2 – More complex cases: parallelization with multiple lists and parameters

Suppose now you want to calculate the weighted average of each column and each row has a different weight. You would need to pass two different arguments to the process_file function, namely the file name and the list of weights. This can be done by using the partial(function, parameter1, parameter2, parameter 3, …) function of the functools library. The partial function combines a user defined function with arguments that will be passed to this function every time it is called. Here is how the code would look like:

from glob import glob
from numpy import genfromtxt
from multiprocessing import Pool
from functools import partial
import random

def process_file(weights, f):
    print f
    data = genfromtxt(f, delimiter=' ')
    n = len(data)
    data_transposed = [list(x) for x in zip(*data)] # transposed data
    averages = [sum(l*w for l, w in zip(line, weights)) / n for line in data_transposed] # sums all lines (former columns, vector of length 24)
    return averages

list_of_files = glob("*.rdm")

p = Pool(16) # creates the pool of workers

weight_factors = [random.random() for _ in range(0, 10000)] # creates random list of weights between 0 and 1

partial_process_file = partial(process_file, weight_factors) # adds weight_factors as an argument to be always passed to the process_file
results = p.map(partial_process_file, list_of_files) # perform the calculations

print results

Keep in mind that the function input parameter related to the list your code will iterate over (f in the process_file function above) must be the last parameter in the function definition.

You may also want to iterate over two lists as opposed to just one, which would be normally done with a code of the type:

for i, j in zip(list1, list2):
    print i + j

In this case, you can zip both lists together and pass them as one combined list to your function. As an example, suppose you want to add a certain value to all values in each file before calculating the columns’ weighted averages in the files processing example. Here is how the previous code could be modified to take care of this task:

from glob import glob
from numpy import genfromtxt
from multiprocessing import Pool
from functools import partial
import random
 
def process_file(weights, file_and_adding_factor_tuple):
    f, adding_factor = file_and_adding_factor_tuple
    data = genfromtxt(f, delimiter=' ') 
    n = len(data) 
    data_transposed = [list(x) for x in zip(*data)] # transposed data
    averages = [sum((l + adding_factor) * w for l, w in zip(line, weights)) / n for line in data_transposed] # sums all lines (former columns, vector of length 24) 
    return averages 
 
list_of_files = glob("*.rdm") 

p = Pool(16) # creates the pool of workers
  
weight_factors = [random.random() for _ in range(0, 10000)] # creates random list of weights between 0 and 1
adding_factors = [random.random() for _ in range(0, len(list_of_files))] # creates random list of numbers between 0 and 1
 
partial_process_file = partial(process_file, weight_factors) # adds weight_factors as an argument to be always passed to the process_file results = p.map(partial_process_file, list_of_files) # perform the calculations print results 
results = p.map(partial_process_file, zip(list_of_files, adding_factors)) # perform the calculations
 
print results

There are other ways of doing code parallelization in Python as well as other reasons why you would do so. Some of those will be addressed in future posts.

Advertisements

One thought on “Speed up your Python code with basic shared memory parallelization

  1. Pingback: Water Programming Blog Guide (Part I) – Water Programming: A Collaborative Research Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s