I've written the code below as a simple example of parallel computing in Cython. In my example, I create two worker objects and have them run in parallel. I would like to generalize this implementation to use a variable number of workers. The problem is that I can't find a way to store an array of Worker objects and access them withing a nogil. Is there a way to do it? Obviously I can hack something reasonable together using the technique below (up to some reasonable hard-coded number of workers), but I'd like something a little more elegant and maintainable if it exists.
Here's the code. The key section is under if use_parallel:
# distutils: language = c
# cython: cdivision = True
# cython: boundscheck = False
# cython: wraparound = False
# cython: profile = False
cimport numpy as cnp
import numpy as np
from cython.parallel import parallel, prange
from libc.math cimport sin
cimport openmp
cnp.import_array()
ctypedef cnp.float64_t FLOAT_t
ctypedef cnp.intp_t INT_t
ctypedef cnp.ulong_t INDEX_t
ctypedef cnp.uint8_t BOOL_t
cdef class Parent:
cdef cnp.ndarray numbers
cdef unsigned int i
cdef Worker worker1
cdef Worker worker2
def __init__(Parent self, list numbers):
self.numbers = <cnp.ndarray[FLOAT_t, ndim=1]> np.array(numbers,dtype=float)
self.worker1 = Worker()
self.worker2 = Worker()
cpdef run(Parent self, bint use_parallel):
cdef unsigned int i
cdef float best
cdef int num_threads
cdef cnp.ndarray[FLOAT_t, ndim=1] numbers = <cnp.ndarray[FLOAT_t, ndim=1]> self.numbers
cdef FLOAT_t[:] buffer1 = self.numbers[:(len(numbers)//2)]
buffer_size1 = buffer1.shape[0]
cdef FLOAT_t[:] buffer2 = self.numbers[(len(numbers)//2):]
buffer_size2 = buffer2.shape[0]
# Run the workers
if use_parallel:
print 'parallel'
with nogil:
for i in prange(2, num_threads=2):
if i == 0:
self.worker1.run(buffer1, buffer_size1)
elif i == 1:
self.worker2.run(buffer2, buffer_size2)
else:
print 'serial'
self.worker1.run(buffer1, buffer_size1)
self.worker2.run(buffer2, buffer_size2)
#Make sure they both ran
print self.worker1.output, self.worker2.output
# Choose the worker that had the best solution
best = min(self.worker1.output, self.worker2.output)
return best
cdef class Worker:
cdef public float output
def __init__(Worker self):
self.output = 0.0
cdef void run(Worker self, FLOAT_t[:] numbers, unsigned int buffer_size) nogil:
cdef unsigned int i
cdef unsigned int j
cdef unsigned int n = buffer_size
cdef FLOAT_t best
cdef bint first = True
cdef FLOAT_t value
for i in range(n):
for j in range(n):
value = sin(numbers[i]*numbers[j])
if first or (value < best):
best = value
first = False
self.output = best
My test script looks like this:
from parallel import Parent
import time
data = list(range(20000))
parent = Parent(data)
t0 = time.time()
output = parent.run(False)
t1 = time.time()
print 'Serial Result: %f' % output
print 'Serial Time: %f' % (t1-t0)
t0 = time.time()
output = parent.run(True)
t1 = time.time()
print 'Parallel Result: %f' % output
print 'Parallel Time: %f' % (t1-t0)
And it produces this output:
serial
-1.0 -1.0
Serial Result: -1.000000
Serial Time: 6.428081
parallel
-1.0 -1.0
Parallel Result: -1.000000
Parallel Time: 4.006907
Finally, this is my setup.py:
from distutils.core import setup
from distutils.extension import Extension
import sys
import numpy
#Determine whether to use Cython
if '--cythonize' in sys.argv:
cythonize_switch = True
del sys.argv[sys.argv.index('--cythonize')]
else:
cythonize_switch = False
#Find all includes
numpy_include = numpy.get_include()
#Set up the ext_modules for Cython or not, depending
if cythonize_switch:
from Cython.Distutils import build_ext
from Cython.Build import cythonize
ext_modules = cythonize([Extension("parallel.parallel", ["parallel/parallel.pyx"],include_dirs = [numpy_include],
extra_compile_args=['-fopenmp'], extra_link_args=['-fopenmp'])])
else:
ext_modules = [Extension("parallel.parallel", ["parallel/parallel.c"],include_dirs = [numpy_include],
extra_compile_args=['-fopenmp'], extra_link_args=['-fopenmp'])]
#Create a dictionary of arguments for setup
setup_args = {'name':'parallel-test',
'version':'0.1.0',
'author':'Jason Rudy',
'author_email':'[email protected]',
'packages':['parallel',],
'license':'LICENSE.txt',
'description':'Let\'s try some parallel programming in Cython',
'long_description':open('README.md','r').read(),
'py_modules' : [],
'ext_modules' : ext_modules,
'classifiers' : ['Development Status :: 3 - Alpha'],
'requires':[]}
#Add the build_ext command only if cythonizing
if cythonize_switch:
setup_args['cmdclass'] = {'build_ext': build_ext}
#Finally
setup(**setup_args)
It must be compiled with gcc, which you can do on mac with
export CC=gcc
before you run setup.py.
As far as I saw, nogil doesn't support the indexation of Python objects (as well as coercion, ...). Consequently we get the following message when storing the Workers in a builtin Python list:
cdef list workers
workers = [Worker(), Worker(), Worker(), Worker()]
Message:
Indexing Python object not allowed without gil
You could try something like this (tested, seems to work fine):
The workaround, here, is to use C syntax for everything that is to be used in the nogil section:
cdef PyObject ** workers
cdef int * buf_sizes
cdef FLOAT_t ** buffers
And allocate those arrays using the good old malloc from libc.stdlib.
Code:
# distutils: language = c
# cython: cdivision = True
# cython: boundscheck = False
# cython: wraparound = False
# cython: profile = False
cimport numpy as cnp
import numpy as np
from cython.parallel import parallel, prange
from libc.math cimport sin
cimport openmp
cnp.import_array()
ctypedef cnp.float64_t FLOAT_t
ctypedef cnp.intp_t INT_t
ctypedef cnp.ulong_t INDEX_t
ctypedef cnp.uint8_t BOOL_t
# Pyobject is the C representation of a Python object
# This allows casts in both ways...
cimport cython
from cpython cimport PyObject
# C memory alloc features
from libc.stdlib cimport malloc, free
cdef FLOAT_t MAXfloat64 = np.float64(np.inf)
cdef class Parent:
cdef cnp.ndarray numbers
cdef unsigned int i
cdef PyObject ** workers
cdef int nb_workers
cdef int * buf_sizes
cdef FLOAT_t ** buffers
def __init__(Parent self, list numbers, int n_workers):
self.numbers = <cnp.ndarray[FLOAT_t, ndim=1]> np.array(numbers,dtype=float)
# Define number of workers
self.nb_workers = n_workers
self.workers = <PyObject **>malloc(self.nb_workers*cython.sizeof(cython.pointer(PyObject)))
# Populate pool
cdef int i
cdef PyObject py_obj
cdef object py_workers
py_workers = [] # For correct ref count
for i in xrange(self.nb_workers):
py_workers.append(Worker())
self.workers[i] = <PyObject*>py_workers[i]
self.init_buffers()
cdef init_buffers(Parent self):
cdef int i, j
cdef int num_threads
cdef int pos, pos_end
cdef int buf_size
num_threads = self.nb_workers
buf_size = len(self.numbers) // num_threads
# Init buffers
self.buffers = <FLOAT_t **>malloc(self.nb_workers * cython.sizeof(cython.pointer(FLOAT_t)))
self.buf_sizes = <int *>malloc(self.nb_workers * cython.sizeof(int))
pos = 0
buf_size = len(self.numbers) // num_threads
for i in xrange(self.nb_workers):
# If we are treating the last worker do everything left
if (i == self.nb_workers-1):
buf_size = len(self.numbers) - pos
self.buf_sizes[i] = buf_size
pos_end = pos + buf_size
self.buffers[i] = <FLOAT_t *>malloc(buf_size * cython.sizeof(FLOAT_t))
for j in xrange(pos, pos_end):
self.buffers[i][j-pos] = <FLOAT_t>self.numbers[j]
pos = pos + buf_size
cpdef run(Parent self, bint use_parallel):
cdef int i
cdef FLOAT_t best
# Run the workers
if use_parallel:
print 'parallel'
with nogil:
for i in prange(self.nb_workers, num_threads=self.nb_workers):
# Changed "FLOAT_t[:]" python object to C array "FLOAT_t *"
(<Worker>self.workers[i]).run(<FLOAT_t *>self.buffers[i], self.buf_sizes[i])
else:
print 'serial'
for i in xrange(self.nb_workers):
(<Worker>self.workers[i]).run(<FLOAT_t *>self.buffers[i], self.buf_sizes[i])
# Make sure they ran
for i in xrange(self.nb_workers):
print (<Worker>self.workers[i]).output
# Choose the worker that had the best solution
best = MAXfloat64
for i in xrange(self.nb_workers):
if ((<Worker>self.workers[i]).output < best):
best = (<Worker>self.workers[i]).output
return best
cdef class Worker:
cdef public float output
def __init__(Worker self):
self.output = 0.0
# Changed "FLOAT_t[:]" python object to C dyn array "FLOAT_t *"
cdef void run(Worker self, FLOAT_t * numbers, unsigned int buffer_size) nogil:
cdef unsigned int i, j
cdef unsigned int n = buffer_size
cdef FLOAT_t best
cdef bint first = True
cdef FLOAT_t value
# Added initialization
best = MAXfloat64
for i in range(n):
for j in range(n):
value = sin(numbers[i]*numbers[j])
if first or (value < best):
best = value
first = False
self.output = best
Test:
from parallel import Parent
import time
data = list(range(20000))
parent = Parent(data, 7)
t0 = time.time()
output = parent.run(False)
t1 = time.time()
print 'Serial Result: %f' % output
print 'Serial Time: %f' % (t1-t0)
t0 = time.time()
output = parent.run(True)
t1 = time.time()
print 'Parallel Result: %f' % output
print 'Parallel Time: %f' % (t1-t0)
Output:
serial
-1.0
-1.0
-1.0
-1.0
-1.0
-1.0
-1.0
Serial Result: -1.000000
Serial Time: 2.741364
parallel
-1.0
-1.0
-1.0
-1.0
-1.0
-1.0
-1.0
Parallel Result: -1.000000
Parallel Time: 0.536419
Hope this fits what you were asking for, or at least gave some ideas... Please share your final implementation...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With