I have Java program that works with large dataset. The dataset stores in hdfs (csv).
The program works fine but it is very slow.
What the program do:
There is my main method:
public static void main(String[] args) {
// configure spark
SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
.setMaster("local[*]")
.set("spark.executor.memory", "4g");
if (args.length > 1)
sparkConf.set("spark.cassandra.connection.host", args[1]);
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// read text file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<MyObject> myObjectJavaRDD = lines
.map(line -> line.split(","))
.filter(someFilter)
.map(MyObject::new);
javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
}
How can i improve perfomance?
Thank you for your answers.
Your code doesn't have shuffle issues(except when you have to write out to HDFS) and default partitioning is defined by input format, on Hadoop splits by HDFS cores and filter or map don't change partitioning. If you can filter first, you could see some improvement
JavaRDD<MyObject> myObjectJavaRDD = lines
.filter(someFilter)
.map(line -> line.split(","))
.map(MyObject::new);
Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions. As far as choosing a "good" number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling
sc.defaultParallelism
or inspect RDD Partitions number by
someRDD.partitions.size
When creating an RDD by reading a file using
rdd = SparkContext().textFile("hdfs://…/file.txt")
the number of partitions may be smaller. Ideally, you would get the same number of blocks as you see in HDFS, but if the lines in your file are too long (longer than the block size), there will be fewer partitions.
Preferred way to set up the number of partitions for an RDD is to directly pass it as the second input parameter in the call like
rdd = sc.textFile("hdfs://… /file.txt", 400)
where 400 is the number of partitions. In this case, the partitioning makes for 400 splits that would be done by the Hadoop’s TextInputFormat , not Spark and it would work much faster. It’s also that the code spawns 400 concurrent tasks to try to load file.txt directly into 400 partitions.
Repartition: increase partitions, rebalancing partitions after filter increase paralellism
repartition(numPartitions: Int)
Coalesce: decrease partitions WITHOUT shuffle consolidate before outputting to HDFS/external
coalesce(numPartitions: Int, suffle: Boolean = false)
And finally, and no less important, you can do some trials with different values and benchmark to see how many time is taking the process
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
I Hope, it helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With