Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark count & percentage for every column values Exception handling and loading to Hive DB

In the below Scala Spark code, I need to find the count and its percentage of the values of different columns. For this I need to use the withColumn method to each and every column like date, usage, payment, dateFinal, usageFinal, paymentFinal.

For each and every calculation I need to use withColumn to get the sum and aggregation. Is there any way by which I don't have to write,

.withColumn("SUM", sum("count").over() ).withColumn("fraction", col("count") / sum("count").over()).withColumn("Percent", col("fraction") * 100 ).drop("fraction")

each and every time? For example as you can see in the below code.

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100   ).drop("fraction")  

var paymentFinalDF = paymentFinal.toDF(PaymentColumn).groupBy(PaymentColumn).count.withColumn("SUM", sum("count").over()).withColumn("fraction", col("count") /  sum("count").over()).withColumn("Percent", col("fraction") * 100).drop("fraction")

now my code is the below one so can you help we have added conditions for different columns like date,usage,etc (eg in code we have fetch the column containing Date than we have added count and other conditions that we want) now those things we want as dynamic, all the columns name should go inside one yml file and have to read these names from that file, How can I achieve this can anyone help and after reading YML file how I will modify my code please help.

object latest

{

 def main(args: Array[String])

  {


  var fileList = new ListBuffer[String]()
  var dateList = new ListBuffer[String]()
  var fileL = new ListBuffer[String]()

  var fileL1 = new ListBuffer[String]()

  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("hbase sql")
  val sc = new SparkContext(sparkConf)
  val spark1 = SparkSession.builder().config(sc.getConf).getOrCreate()
  val sqlContext = spark1.sqlContext


   import spark1.implicits._

    def f1(number: Double)=
     { 
     "%.2f".format(number).toDouble
     }
    val udfFunc = udf(f1 _)   

    def getCountPercent(df: DataFrame): DataFrame =

    {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .withColumn("number", udfFunc(col("Percent")))
    .drop("Percent")
    .drop("fraction")

    }


   def occurenceCount(df: DataFrame,column: String)    
   {

    var usageFinalDF = df.groupBy(column).count.transform(getCountPercent)           

   for (u <- usageFinalDF.collect())
 {
         fileList += column + '~' + u.mkString("~")
  }
}




     val headerCSV=spark1.sqlContext.read.format("CSV").option("header","true").option("delimiter", """|""").load("C:\\Users\\ayushgup\\Downloads\\Header3.csv")

    val columns = headerCSV.columns


  val data =   spark1.sqlContext.read.format("CSV").option("delimiter", """|""").load("C:/Users/ayushgup/Downloads/home_data_usage_2018122723_1372673.csv").toDF(columns:_*)

 for (coll <- columns.toList) 
    {

  if (coll.toLowerCase().contains("date")) 
    {

    for (datesss <- data.select(coll).collect()) 
    {
      dateList += datesss.toString().slice(1, 8)

    }

    var dateFinalDF = dateList.toList.toDF(coll)

    occurenceCount(dateFinalDF,coll)

     } 
    else if (coll.toLowerCase().contains("usage")) 
 {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1026, "<=1gb").when(col(coll) > 1026 && col(coll) < 5130, "1-5gb")
      .when(col(coll) > 5130 && col(coll) < 10260, "5-10gb")
      .when(col(coll) > 10260 && col(coll) < 20520, "10-20gb")
      .when(col(coll) > 20520, ">20gb")
      .otherwise(0)).toDF(coll)

      occurenceCount(r,coll)

  } 
  else if (coll.toLowerCase().contains("paymentamount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

    occurenceCount(r,coll)

  } 
   else if (coll.toLowerCase().contains("accounttenure")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) > 1000000 && col(coll) < 5000000, "1-5m").when(col(coll) > 5000000 && col(coll) < 11000000, "5-11m")
      .when(col(coll) > 12000000 && col(coll) < 23000000, "12-23m")
      .when(col(coll) > 24000000 && col(coll) < 35000000, "24-35m")
      .when(col(coll) > 36000000, ">36m")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)
  } 
  else if (coll.toLowerCase().equals("arpu")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) <= 1500, "1-1500").when(col(coll) > 1500 && col(coll) < 1700, "1500-1700")
      .when(col(coll) > 1700 && col(coll) < 1900, "1700-1900")
      .when(col(coll) > 1900 && col(coll) < 2000, "1900-2000")
      .when(col(coll) > 2000, ">2000")
      .otherwise(0)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("DisputeAmount") || coll.equals("ticketsAmount")) 
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) > 0, ">0")
      .otherwise(1)).toDF(coll)

   occurenceCount(r,coll)

  } 
  else if (coll.equals("serviceOrdersCreatedLast90Days"))
  {

    var r = data.select(coll).withColumn(coll, when(col(coll) === 0, "0").when(col(coll) === 1, "1")
      .when(col(coll) === 2, "2")
      .when(col(coll) === 3, "3")
      .when(col(coll) > 3, ">3"))
      .toDF(coll)

   occurenceCount(r,coll)
  } 
  else 
  {

    import spark1.implicits._

   val actData1 = data.groupBy(coll).count().transform(getCountPercent) 



    occurenceCount(actData1,coll)

  }
   }

val f = fileList.toList
for (flist <- f) 
   {

   fileL += flist.replaceAll("[\\[\\]]", "")

   }

   var ff = fileL.toDF()


   var df1: DataFrame = ff.selectExpr("split(value, '~')[0] as 
     Attribute", "split(value, '~')[1] as Value","split(value, '~')[2] as 
     Count","split(value, '~')[3] as Sum","split(value, '~')[4] as 
    Percent");

   }

   }
like image 334
Arush70 Avatar asked Jan 29 '26 23:01

Arush70


1 Answers

You can encapsulate all the .withColumn() operations in a function, which returns the DataFrame after applying all operations.

def getCountPercent(df: DataFrame): DataFrame = {
  df.withColumn("SUM", sum("count").over() )
    .withColumn("fraction", col("count") / sum("count").over())
    .withColumn("Percent", col("fraction") * 100 )
    .drop("fraction")
}  

Usage:

use .transform() to apply the function:

var dateFinalDF = dateFinal.toDF(DateColumn).groupBy(DateColumn).count.transform(getCountPercent)
var usageFinalDF = usageFinal.toDF(UsageColumn).groupBy(UsageColumn).count.transform(getCountPercent)
like image 179
vdep Avatar answered Feb 01 '26 15:02

vdep



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!