Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert a List of Map in Java to Dataset in spark

I have a list of Map in java, essentially representing rows.

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

I'm trying to create a Spark DataFrame from it.

I've tried to convert it into JavaRDD<Map<String, Object>> using

JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);

But I'm not sure how to go from here to Dataset<Row>. I've seen Scala examples but none in Java.

I also tried to convert the list to JSON string, and read the JSON string.

String jsonStr = mapper.writeValueAsString(dataList);

But seems like I will have to write it to a file to then read using

Dataset<Row> df = spark.read().json(pathToFile);

I would prefer to do it in-memory if possible rather than write to file and read from there.

SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
            .set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
    SparkSession sparkSession = 
SparkSession.builder().config(sparkConf).getOrCreate();

List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);

Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);

dataList.add(row1);
dataList.add(row2);

ObjectMapper mapper = new ObjectMapper();
    
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();
like image 446
gargravarr Avatar asked Jan 27 '26 22:01

gargravarr


1 Answers

You do not need to use RDDs at all. What you need to do is extract the desired schema from your list of maps, transform you list of maps into a list of rows and then use spark.createDataFrame.

In java, that's a bit painful, particularly when creating the Row objects, but here is how it could go:

List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
    .stream()
    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
    .map(row -> row.collect(Collectors.toList()))
    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
    .map(Row$.MODULE$::fromSeq)
    .collect(Collectors.toList());

StructType schema = new StructType(
    cols.stream()
        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
        .collect(Collectors.toList())
        .toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);
like image 112
Oli Avatar answered Jan 29 '26 12:01

Oli



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!