Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark & Scala: Generate DataSet (or Dataframe) with given size

For evaluation purposes, I need a function that creates a dummy-Dataset (or, alternatively aDataFrame), initialized with random numbers. The dimensions in terms of columns and rows should be parametrized

I came up with a solution, but that is absurdly slow (5.3s for 10 rows with 100 columns):

def createDummyDataset(rows : Int, columns: Int, spark: SparkSession) = {
   import spark.implicits._

   var ds = Seq.fill(rows)(Random.nextDouble).toDF()
   if (columns > 1) {
      for (i <- 2 to columns) {
         ds = ds.withColumn(i.toString, rand)
      }
   }
   ds // return ds
}

Is that due to the architecture of Spark or am I doing something completely wrong and there is a much better way?

I guess a better way would be to define some kind of matrix and convert that to a Dataset at a blow. But I was not able to figure that out.

System: Spark 2.1.0, Scala 2.11.8, Ubuntu 16.04, i5-6300U, 32GB RAM

like image 897
Boern Avatar asked Jan 23 '26 22:01

Boern


2 Answers

Doing it by adding columns to an existing DataFrame is going to cause a lot of Spark-related overhead.

Better to create a 2D array style collection then parallelize that all in one go:

import org.apache.spark.sql.Row
import spark.implicits._

val data = (0 to rows).map(_ => Seq.fill(columns)(Random.nextDouble))
val rdd = sc.parallelize(data)
val df = rdd.map(s => Row.fromSeq(s)).toDF()
like image 96
ImDarrenG Avatar answered Jan 26 '26 13:01

ImDarrenG


Based on ImDarrenG answer, however the output is a data frame with n rows and m columns.

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}

def start(rows: Int, cols: Int, col: String, spark: SparkSession): Unit = {

         val data = (1 to rows).map(_ => Seq.fill(cols)(Random.nextDouble))

         val colNames = (1 to cols).mkString(",")
         val sch = StructType(colNames.split(",").map(fieldName => StructField(fieldName, DoubleType, true)))

         val rdd = spark.sparkContext.parallelize(data.map(x => Row(x:_*)))
         val df = spark.sqlContext.createDataFrame(rdd, sch)

         df.printSchema()

         spark.stop()
    }

Running on Spark 2.1.0, Scala 2.11.8, Fedora Scientific, i5-5200U 4 cores, 16Gb RAM

For 10 Rows x 100 Columns on average elapsed time was 0.9 sec

like image 39
geo Avatar answered Jan 26 '26 14:01

geo



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!