Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert spark rdd to a numpy array?

I have read textFile using spark context, test file is a csv file. Below testRdd is the similar format as my rdd.

testRdd = [[1.0,2.0,3.0,4.0,5.0,6.0,7.0],
[0.0,0.1,0.3,0.4,0.5,0.6,0.7],[1.1,1.2,1.3,1.4,1.5,1.6,1.7]]

I want to convert the the above rdd into a numpy array, So I can feed the numpy array into my machine learning model.

when I tried the following

 feature_vector = numpy.array(testRDD).astype(numpy.float32)

It gives me the below TypeError:

TypeError: float() argument must be a string or a number

How Should I covert the spark rdd into a numpy array.

like image 513
Vamsi Nimmala Avatar asked Jan 22 '26 23:01

Vamsi Nimmala


2 Answers

You'll have to collect the data to your local machine before calling numpy.array:

import numpy as np

a = np.array(testRdd.collect())
print(a)
#array([[ 1. ,  2. ,  3. ,  4. ,  5. ,  6. ,  7. ],
#       [ 0. ,  0.1,  0.3,  0.4,  0.5,  0.6,  0.7],
#       [ 1.1,  1.2,  1.3,  1.4,  1.5,  1.6,  1.7]])

Or if you want each row as a separate array:

b = testRdd.map(np.array).collect()
print(b)
#[array([ 1.,  2.,  3.,  4.,  5.,  6.,  7.]),
# array([ 0. ,  0.1,  0.3,  0.4,  0.5,  0.6,  0.7]),
# array([ 1.1,  1.2,  1.3,  1.4,  1.5,  1.6,  1.7])]
like image 181
pault Avatar answered Jan 24 '26 13:01

pault


I had the same problem and collect() is not very efficient.

For me writing multiple numpy files on the executors worked pretty well, and loading multiple files using numpy is no problem. The number of resulting files equals the number of partitions.

In my case I had to put the files into hdfs as I have no access to the executor nodes:

from pyspark.sql.types import *
from pyspark.sql.functions import spark_partition_id

def write_numy(list):
  import numpy as np
  from pyspark.taskcontext import TaskContext
  import os
  
  ctx = TaskContext()
  id = ctx.partitionId()
  
  local_path = "/tmp/test"+str(id)+".npy"
  hdfs_dest_path = "/tmp/test/"
  np.save(local_path, np.array(list))   
  os.system("hadoop fs -put "+local_path+" "+hdfs_dest_path)

schema = StructType([ StructField("ID", IntegerType()), StructField("TS", ArrayType( IntegerType()) )])
data = spark.createDataFrame(spark.sparkContext.parallelize(range(1, 1999)).map(lambda x: (x, range(1, 100)) ),schema)

data.rdd.foreachPartition(write_numy)
like image 24
koenigp Avatar answered Jan 24 '26 11:01

koenigp



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!