Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do parallel processing in pyspark

I want to do parallel processing in for loop using pyspark.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala

like image 367
Amol Avatar asked Sep 06 '25 03:09

Amol


2 Answers

Spark itself runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it (this was tested on DataBricks Only link).

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it into pool.map( fun,data).

Change the function fun as need be.

For more details on the multiprocessing module check the documentation.

Similarly, if you want to do it in Scala you will need the following modules

import scala.concurrent.{Future, Await}

For a more detailed understanding check this out. The code is for Databricks but with a few changes, it will work with your environment.

like image 82
Andy_101 Avatar answered Sep 07 '25 21:09

Andy_101


Here's a parallel loop on pyspark using azure databricks.

import socket

def getsock(i):
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(("8.8.8.8", 80))
  return s.getsockname()[0]

rdd1 = sc.parallelize(list(range(10)))
parallel=rdd1.map(getsock).collect()

On other platforms than azure you'll maybe need to create the spark context sc. On azure the variable exists by default.

Coding it up like this only makes sense if in the code that is executed parallelly (getsock here) there is no code that is already parallel. For instance, had getsock contained code to go through a pyspark DataFrame then that code is already parallel. So, it would probably not make sense to also "parallelize" that loop.

like image 38
vaudt Avatar answered Sep 07 '25 20:09

vaudt