Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Replacing multiprocessing pool.map with mpi4py

I'm a beginner in using MPI, and I'm still going through the documentation. However, there's very little to work on when it comes to mpi4py. I have written a code that currently uses the multiprocessing module to run on many cores, but I need replace this with mpi4py so that I can use more than one node to run my code. My code is below, when using the multiprocessing module, and also without.

With multiprocessing,

import numpy as np
import multiprocessing 


start_time = time.time()

E = 0.1
M = 5
n = 1000
G = 1
c = 1
stretch = [10, 1]


#Point-Distribution Generator Function 
def CDF_inv(x, e, m):
    A = 1/(1 + np.log(m/e))
    if x == 1:
        return m
    elif 0 <= x <= A:
        return e * x / A
    elif A < x < 1:
        return e * np.exp((x / A) - 1)

#Elliptical point distribution Generator Function

def get_coor_ellip(dist=CDF_inv, params=[E, M], stretch=stretch):
    R = dist(random.random(), *params)
    theta = random.random() * 2 * np.pi
    return (R * np.cos(theta) * stretch[0], R * np.sin(theta) * stretch[1])


def get_dist_sq(x_array, y_array):
    return x_array**2 + y_array**2


#Function to obtain alpha

def get_alpha(args):
    zeta_list_part, M_list_part, X, Y = args
    alpha_x = 0
    alpha_y = 0
    for key in range(len(M_list_part)):
        z_m_z_x = X - zeta_list_part[key][0]
        z_m_z_y = Y - zeta_list_part[key][1]
        dist_z_m_z = get_dist_sq(z_m_z_x, z_m_z_y)
        alpha_x += M_list_part[key] * z_m_z_x / dist_z_m_z
        alpha_y += M_list_part[key] * z_m_z_y / dist_z_m_z
    return (alpha_x, alpha_y)

#The part of the process containing the loop that needs to be parallelised, where I use pool.map()

if __name__ == '__main__':
    # n processes, scale accordingly
    num_processes = 10
    pool = multiprocessing.Pool(processes=num_processes)
    random_sample = [CDF_inv(x, E, M)
                     for x in [random.random() for e in range(n)]]
    zeta_list = [get_coor_ellip() for e in range(n)]
    x1, y1 = zip(*zeta_list)
    zeta_list = np.column_stack((np.array(x1), np.array(y1)))
    x = np.linspace(-3, 3, 100)
    y = np.linspace(-3, 3, 100)
    X, Y = np.meshgrid(x, y)
    print len(x)*len(y)*n,'calculations to be carried out.'
    M_list = np.array([.001 for i in range(n)])
    # split zeta_list, M_list, X, and Y
    zeta_list_split = np.array_split(zeta_list, num_processes, axis=0)
    M_list_split = np.array_split(M_list, num_processes)
    X_list = [X for e in range(num_processes)]
    Y_list = [Y for e in range(num_processes)]

    alpha_list = pool.map(
            get_alpha, zip(zeta_list_split, M_list_split, X_list, Y_list))
    alpha_x = 0  
    alpha_y = 0
    for e in alpha_list:
        alpha_x += e[0] * 4 * G / (c**2)
        alpha_y += e[1] * 4 * G / (c**2)

print("%f seconds" % (time.time() - start_time))

Without multiprocessing,

import numpy as np


E = 0.1
M = 5
G = 1
c = 1
M_list = [.1 for i in range(n)]

#Point-Distribution Generator Function 

def CDF_inv(x, e, m):
    A = 1/(1 + np.log(m/e))
    if x == 1:
        return m
    elif 0 <= x <= A:
        return e * x / A
    elif A < x < 1:
        return e * np.exp((x / A) - 1)



n = 1000
random_sample = [CDF_inv(x, E, M)
                 for x in [random.random() for e in range(n)]]
stretch = [5, 2]

#Elliptical point distribution Generator Function

def get_coor_ellip(dist=CDF_inv, params=[E, M], stretch=stretch):
    R = dist(random.random(), *params)
    theta = random.random() * 2 * np.pi
    return (R * np.cos(theta) * stretch[0], R * np.sin(theta) * stretch[1])

#zeta_list is the list of coordinates of a distribution of points
zeta_list = [get_coor_ellip() for e in range(n)]
x1, y1 = zip(*zeta_list)
zeta_list = np.column_stack((np.array(x1), np.array(y1)))

#Creation of a X-Y Grid
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)

def get_dist_sq(x_array, y_array):
    return x_array**2 + y_array**2


#Calculation of alpha, containing the loop that needs to be parallelised.

alpha_x = 0
alpha_y = 0
for key in range(len(M_list)):
    z_m_z_x = X - zeta_list[key][0]
    z_m_z_y = Y - zeta_list[key][1]
    dist_z_m_z = get_dist_sq(z_m_z_x, z_m_z_y)
    alpha_x += M_list[key] * z_m_z_x / dist_z_m_z
    alpha_y += M_list[key] * z_m_z_y / dist_z_m_z
alpha_x *= 4 * G / (c**2)
alpha_y *= 4 * G / (c**2)

Basically what my code does is, it first generates a list of points that follow a certain distribution. Then I apply an equation to obtain the quantity 'alpha' using different relations between the distances of the points. The part that requires parallelisation is the single for loop involved in the calculation of alpha. What I want to do is to use mpi4py instead of multiprocessing to do this, and I am not sure how to get this going.

like image 367
ThunderFlash Avatar asked Oct 16 '25 07:10

ThunderFlash


2 Answers

Transforming the multiprocessing.map version to MPI can be done using scatter / gather. In your case it is useful, that you already prepare the input list into one chunk for each rank. The main difference is, that all code gets executed by all ranks in the first place, so you must make everything that should be done only by the maste rank 0 conidtional.

if __name__ == '__main__':
    comm = MPI.COMM_WORLD
    if comm.rank == 0:
        random_sample = [CDF_inv(x, E, M)
                         for x in [random.random() for e in range(n)]]
        zeta_list = [get_coor_ellip() for e in range(n)]
        x1, y1 = zip(*zeta_list)
        zeta_list = np.column_stack((np.array(x1), np.array(y1)))
        x = np.linspace(-3, 3, 100)
        y = np.linspace(-3, 3, 100)
        X, Y = np.meshgrid(x, y)
        print len(x)*len(y)*n,'calculations to be carried out.'
        M_list = np.array([.001 for i in range(n)])
        # split zeta_list, M_list, X, and Y
        zeta_list_split = np.array_split(zeta_list, comm.size, axis=0)
        M_list_split = np.array_split(M_list, comm.size)
        X_list = [X for e in range(comm.size)]
        Y_list = [Y for e in range(comm.size)]
        work_list = list(zip(zeta_list_split, M_list_split, X_list, Y_list))
    else:
        work_list = None

    my_work = comm.scatter(work_list)
    my_alpha = get_alpha(my_work)

    alpha_list = comm.gather(my_alpha)
    if comm.rank == 0:
        alpha_x = 0  
        alpha_y = 0
        for e in alpha_list:
            alpha_x += e[0] * 4 * G / (c**2)
            alpha_y += e[1] * 4 * G / (c**2)

This works fine as long as each processor gets a similar amount of work. If communication becomes an issue, you might want to split up the data generation among processors instead of doing it all on the master rank 0.

Note: Some things about the code are bogus, e.g. alpha_[xy] ends up as np.ndarray. The serial version runs into an error.

like image 153
Zulan Avatar answered Oct 18 '25 21:10

Zulan


For people who are still interested in similar subjects, I highly recommend having a look at the MPIPoolExecutor() class here and the documentation is here.

like image 24
Quarkonia Avatar answered Oct 18 '25 23:10

Quarkonia



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!