Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

replace for loop to parallel process in pyspark

I am using for loop in my script to call a function for each element of size_DF(data frame) but it is taking lot of time. I tried by removing the for loop by map but i am not getting any output.
size_DF is list of around 300 element which i am fetching from a table.

Using For:

import call_functions

newObject = call_functions.call_functions_class()
size_RDD = sc.parallelize(size_DF) 

if len(size_DF) == 0:
    print "No record present in the truncated list"
else:

    for row in size_DF:
        length = row[0]
        print "length: ", length
        insertDF = newObject.full_item(sc, dataBase, length, end_date)

Using Map

if len(size_DF) == 0:
    print "No record present in the list"
else:
    size_RDD.mapPartition(lambda l: newObject.full_item(sc, dataBase, len(l[0]), end_date))

newObject.full_item(sc, dataBase, len(l[0]), end_date) In full_item() -- I am doing some select ope and joining 2 tables and inserting the data into a table.

Please help me and let me know what i am doing wrong.

like image 579
A.N.Gupta Avatar asked Oct 21 '25 20:10

A.N.Gupta


1 Answers

pyspark.rdd.RDD.mapPartition method is lazily evaluated. Usually to force an evaluation, you can a method that returns a value on the lazy RDD instance that is returned.

There are higher-level functions that take care of forcing an evaluation of the RDD values. e.g. pyspark.rdd.RDD.foreach

Since you don't really care about the results of the operation you can use pyspark.rdd.RDD.foreach instead of pyspark.rdd.RDD.mapPartition.

def first_of(it):
    for first in it:
        return first
    return []

def insert_first(it):
    first = first_of(it)
    item_count = len(first)
    newObject.full_item(sc, dataBase, item_count, end_date)


if len(size_DF) == 0:
    print('No record present in the truncated list')
else:
    size_DF.forEach(insert_first)
like image 125
Oluwafemi Sule Avatar answered Oct 23 '25 11:10

Oluwafemi Sule



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!