Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compare 2 columns and concatenate in Scala

This is my text file that is an input to the program:

Id       Title Copy
B2002010 gyh   1
D2001001 abc   12
M2003005 zxc   3
D2002003 qwe   13
M2001002 efg   1
D2001004 asd   6
D2003005 zxc   3
M2001006 wer   6
D2001006 wer   6
B2004008 sxc   10
D2002007 sdf   9
D2004008 sxc   10

ID is formatted as Xyyyyrrr where:

  • X is B => Book or M => Magazine
  • yyyy is the year
  • rrr is random number.

What I have to do is: Obtain the total number of copies for books or magazines which are from the same year. Plus, a small data cleansing for the column "copy", if I find something other than a number I will replace it with a '0'.

My Spark project is on Eclipse and I am using Maven and Scala IDE I need to use a MapReduce Function.

I have started my Map function that splits the text file.

This is the code I started:

package bd.spark_app

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import scala.io.Source
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.log4j._
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.Array

object alla {
  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("trying")
    val sc = new SparkContext(conf)
    val x = sc.textFile("/home/hadoopusr/sampledata")

    x.map(_.split(" ")).foreach(r => 
      println(r(0).dropRight(3), r(2))
    )

    sc.stop()
  }
} 

This is my result for the Map function I have showed above

(B2002,1)
(D2001,12)
(M2003,3)
(D2002,13)
(M2001,1)
(D2001,6)
(D2003,3)
(M2001,6)
(D2001,6)
(B2004,10)
(D2002,9)
(D2004,10)
(M2004,11)
(D2004,11)

I just need some sort of reduce function that will grab all the books and magazines from the same year and add the number of copies together and check that the column "copy" are Numbers

Example: with records (B2002,12) and (B2002,16) the result should be (B2002,28).

like image 499
Amel Bent Avatar asked Nov 29 '25 01:11

Amel Bent


1 Answers

Method "reduceByKey" can be used:

val converted = x.map(_.split(" ")).map(r => (r(0).dropRight(3), r(2).toInt))
val result = converted.reduceByKey(_ + _)

Output:

(M2001,7)
(D2001,24)
(M2003,3)
(D2003,3)
(D2002,22)
(D2004,10)
(B2002,1)
(B2004,10)

Note: looks like input file has "csv" format, and better use "spark.read.csv" for read data, and work with DataFrame instead of RDD.

like image 194
pasha701 Avatar answered Dec 01 '25 17:12

pasha701



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!