Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a dataframe of rows of many fields in Spark

I need to create a DataFrame whose rows include around 30 members (int, double and string). What I did was to create one row of DataFrame and it works:

var res_df = sc.parallelize(Seq((
  results_combine(0),
  results_combine(1),
  results_combine(2),
  results_combine(3),
  results_combine(4),
  results_combine(5),
  results_combine(6),
  results_combine(7),
  results_combine(8),
  results_combine(9),
  results_combine(10)
))).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")

However, when I tried to add more elements to the tuple inside of the Seq, I got an error because of 22 element limit. How can I do this?

like image 385
Mustafa Orkun Acar Avatar asked Oct 16 '25 01:10

Mustafa Orkun Acar


1 Answers

So here's an example using explicit Row and schema definition APIs.

The (mildy) annoying part is setting up the schema object. See StructField and StructType.

Hopefully this works under Scala 2.10.x!

scala> import org.apache.spark.sql.{DataFrame,Row}
import org.apache.spark.sql.{DataFrame, Row}

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val alphabet = ('a' to 'z').map( _ + "" ) // for column labels
alphabet: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)

scala> val row1 = Row( 1 to 26 : _* )
row1: org.apache.spark.sql.Row = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26]

scala> val row2 = Row( 26 to 1 by -1 : _* )
row2: org.apache.spark.sql.Row = [26,25,24,23,22,21,20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1]

scala> val schema = StructType( alphabet.map( label =>  StructField(label, IntegerType, false) ) )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false), StructField(d,IntegerType,false), StructField(e,IntegerType,false), StructField(f,IntegerType,false), StructField(g,IntegerType,false), StructField(h,IntegerType,false), StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(k,IntegerType,false), StructField(l,IntegerType,false), StructField(m,IntegerType,false), StructField(n,IntegerType,false), StructField(o,IntegerType,false), StructField(p,IntegerType,false), StructField(q,IntegerType,false), StructField(r,IntegerType,false), StructField(s,IntegerType,false), StructField(t,IntegerType,false), StructField(u,IntegerType,false), StructField(v,IntegerTyp...

scala> val rdd = hiveContext.sparkContext.parallelize( Seq( row1, row2 ) )
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[5] at parallelize at <console>:23

scala> val df = hiveContext.createDataFrame( rdd, schema )
df: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int, d: int, e: int, f: int, g: int, h: int, i: int, j: int, k: int, l: int, m: int, n: int, o: int, p: int, q: int, r: int, s: int, t: int, u: int, v: int, w: int, x: int, y: int, z: int]

scala> df.show()
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|  g|  h|  i|  j|  k|  l|  m|  n|  o|  p|  q|  r|  s|  t|  u|  v|  w|  x|  y|  z|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26|
| 26| 25| 24| 23| 22| 21| 20| 19| 18| 17| 16| 15| 14| 13| 12| 11| 10|  9|  8|  7|  6|  5|  4|  3|  2|  1|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
like image 64
Steve Waldman Avatar answered Oct 18 '25 18:10

Steve Waldman



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!