in my project, I'm transferring data from MongoDB to SparkSQL table for SQL-based queries. But Spark SQL let me to create temporary files. When I want to query something, execution time is very high, because data transferring and mapping operation takes too much time.
So, can I reduce execution time? Can I create permanent Spark SQL tables? Can I query permanent tables with JDBC?
I'm adding my code and execution time results. I'm doing everything on standalone mode.
package com.mongodb.spark.sql;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.bson.BSONObject;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.spark.demo.Observation;
import com.mongodb.spark.demo.Sensor;
import scala.Tuple2;
public class SparkSqlMongo {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", "mongodb://localhost:27017/test.observations");
Configuration sensConf = new Configuration();
sensConf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
sensConf.set("mongo.input.uri", "mongodb://localhost:27017/test.sens");
SparkConf sconf = new SparkConf().setMaster("local[2]").setAppName("SQL DENEME").set("nsmc.connection.host",
"mongodb:");
JavaSparkContext sc = new JavaSparkContext(sconf);
SQLContext sql = new SQLContext(sc);
JavaRDD<Observation> obs = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class)
.map(new Function<Tuple2<Object, BSONObject>, Observation>() {
private static final long serialVersionUID = 1L;
@Override
public Observation call(Tuple2<Object, BSONObject> v1) throws Exception {
int id = (int) v1._2.get("_id");
double value = (double) v1._2.get("Value");
// Date time = (Date) v1._2.get("Time");
int sensor = (int) v1._2.get("SensorId");
int stream = (int) v1._2.get("DataStreamId");
Observation obs = new Observation(id, value, sensor, stream);
return obs;
}
});
DataFrame obsi = sql.createDataFrame(obs, Observation.class);
obsi.registerTempTable("obsi");
JavaRDD<Sensor> sens = sc.newAPIHadoopRDD(sensConf, MongoInputFormat.class, Object.class, BSONObject.class)
.map(new Function<Tuple2<Object, BSONObject>, Sensor>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Sensor call(Tuple2<Object, BSONObject> v1) throws Exception {
int id = (int) v1._2.get("_id");
String name = (String) v1._2.get("Name");
String description = (String) v1._2.get("Description");
Sensor s = new Sensor(id, name, description);
System.out.println(s.getName());
return s;
}
});
DataFrame sensi = sql.createDataFrame(sens, Sensor.class);
sensi.registerTempTable("sensi");
sensi.show();
long start = System.currentTimeMillis();
DataFrame obser = sql
.sql("SELECT obsi.value, obsi.id, sensi.name FROM obsi, sensi WHERE obsi.sensorID = sensi.id and sensi.id = 107")
.cache();
long stop = System.currentTimeMillis();
// System.out.println("count ====>>> " + a.toString());
System.out.println("toplam sorgu zamani : " + (stop - start));
;
//
// while(!obser.equals(null)){
// System.out.println(obser);
// }
List<String> names = obser.javaRDD().map(new Function<Row, String>() {
private static final long serialVersionUID = 1L;
public String call(Row row) {
// System.out.println(row);
// System.out.println("value : " + row.getDouble(0) + " id : " +
// row.getInt(1) + " name : " + row.getString(0));
return "Name: " + row;
}
}).collect();
}
}
All execution time is about 120 seconds for about 5M observation and 1K sns data. I join these tables and this execution time is very high and unacceptable.
Caching your Table
, Dataframe or Rdd.df.saveAsTable
method but dataframe should be created through HiveContext.Thrift service
then you can perform Spark Sql
on registers tables.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With