I'm a newbie on Spark (My version is 1.6.0) and now I'm trying to solve the problem given below:
Suppose there are two source files:
Now we need to insert a new column into A, given the logic below:
I have already read in the files and converted them into data frames. For the first situation, I got the result by left outer joining them together. But I cannot find good way in the next step.
My current trying is to build a new data frame by joining A and B using a less strict condition. However I've no clue how to update the current data frame from the other one. Or is there any other more intuitive and efficient way to tackle the whole problem?
Thanks for all the answers.
-----------------------------Update on 20160309--------------------------------
Finally accepted @mlk 's answer. Still great thanks to @zero323 for his/her great comments on UDF versus join, the Tungsten code generation is really another problem we are facing now. But since we need to do scores of lookup and average 4 conditions for every lookup, the former solution is more suitable...
The final solution is somehow looks like below snippet:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
  (aStr: String, bStr: String, cStr: String) =>
    tableBroadcast.value.find {
      case TableType(a, b, c, _) =>
        (a == aStr && b == bStr && c == cStr) ||
        (a == aStr && b == bStr)
    }.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
PySpark RDD – lookup()lookup() is an action in pair RDD, which is used to return all the values that are associated with a key in a list. It is performed on single pair RDD. It takes a key as a parameter.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
How to create Broadcast variable. The Spark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.
In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.
As B is small I think the best way to do this would be a broadcast variable and user defined function.
// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))
val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")
// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)
// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }
// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show
Just for reference a solution without UDFs:
val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1"))
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2"))
// Match A, B and C
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1")
// Match A and B mismatch C
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2")
val toDrop = b1.columns ++ b2.columns
toDrop.foldLeft(a
  .join(b1, expr1, "leftouter")
  .join(b2, expr2, "leftouter")
  // If there is match on A, B, C then D_1 should be not NULL
  // otherwise we fall-back to D_2 
  .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c))
This assumes there is at most one match in each category (all three columns, or the first two) or duplicate rows in the output are desired.
UDF vs JOIN:
There are multiple factors to consider and there is no simple answer here:
Cons:
joins require passing data twice to the worker nodes. As for now broadcasted tables are not cached (SPARK-3863) and it is unlikely to change in the nearest future (Resolution: Later).join operation is applied twice even if there is a full match.Pros:
join and coalesce are transparent to the optimizer while UDFs are not.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With