Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelizing a for loop with map and reduce in spark with pyspark

In my application, I am creating different data-frames from data in different locations on S3, and then trying to merge the dataframes into a single dataframes. Right now I am using a for loop for this. But I have a feeling this can be done in a much more efficient way using map and reduce functions in pyspark. Here's my code:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, GroupedData
import pandas as pd
from datetime import datetime


sparkConf = SparkConf().setAppName('myTestApp')
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

filepath = 's3n://my-s3-bucket/report_date='

date_from = pd.to_datetime('2016-08-01',format='%Y-%m-%d')
date_to = pd.to_datetime('2016-08-22',format='%Y-%m-%d')
datelist = pd.date_range(date_from, date_to)

First = True

#THIS is the for-loop I want to get rid of
for dt in datelist:
    date_string = datetime.strftime(dt, '%Y-%m-%d')
    print('Running the pyspark - Data read for the date - '+date_string)
    df = sqlContext.read.format("com.databricks.spark.csv").options(header = "false", inferschema = "true", delimiter = "\t").load(filepath + date_string + '/*.gz')

    if First:
        First=False
        df_Full = df
    else:
        df_Full = df_Full.unionAll(df)
like image 700
krackoder Avatar asked Oct 21 '25 12:10

krackoder


1 Answers

Actually iterative union, although suboptimal, is not the biggest issue here. Much more serious problem is introduced by schema inference (inferschema = "true").

It not only makes data frame creation not lazy but also requires a separate data scan just for inference. If you know schema up front you should provide it as an argument for DataFrameReader:

schema = ...

df = sqlContext.read.format("com.databricks.spark.csv").schema(schema)

otherwise you can extract it from the first DataFrame. Combined with well tuned parallelism it should work just fine but if number of files you fetch is large you should also consider a little bit smarter approach than an iterative union. You'll find an example in my answer to Spark union of multiple RDDs. It is more expensive but has better general properties.

Regarding your idea it is not possible to nest operations on distributed data structures so if you want to read data inside map you'll have to use S3 client directly without utilizing SQLContext.

like image 135
zero323 Avatar answered Oct 24 '25 02:10

zero323