Hi I have this code in Notebooks and traying to code python spark:
mydataNoSQL.createOrReplaceTempView("mytable")
spark.sql("SELECT * from mytable")
return mydataNoSQL
def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))
I need an instance RDD but I am geting an class 'pyspark.rdd.PipelinedRDD'
Any help will be wellcome.
pyspark.rdd.PipelinedRDD is a subclass of RDD and it must have all the API's defined in the RDD. ie. PipelinedRDD is just a special type of RDD which is created when you run a map function on an RDD.
for example, take a look at the below snippet.
>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD
so you should just treat your pyspark.rdd.PipelinedRDD just as an RDD in your code.
There is no complete casting support in Python as it is a dynamically typed language. to forcefully convert your pyspark.rdd.PipelinedRDD to a normal RDD you can collect on rdd and parallelize it back
>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
Running collect on an RDD may cause MemoryError if the RDD's data is large.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With