I have a JSON file containing many fields. I read the file using spark's Dataset in java.
Spark version 2.2.0
java jdk 1.8.0_121
Below is the code.
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.read().json("jsonfile.json");
I would like to use withColumn function with a custom UDF to add a new column.
UDF1 someudf = new UDF1<Row,String>(){
public String call(Row fin) throws Exception{
String some_str = fin.getAs("String");
return some_str;
}
};
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();
I get a cast error when I run the above code. java.lang.String cannot be cast to org.apache.spark.sql.Row
Questions:
1 - Is reading into a dataset of rows the only option? I can convert the df into a df of strings. but I will not be able to select fields.
2 - Tried but failed to define user defined datatype. I was not able to register the UDF with this custom UDDatatype. do I need user defined datatypes here?
3 - and the main question, how can I cast from String to Row?
Part of the log is copied below:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at Risks.readcsv$1.call(readcsv.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512)
... 16 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string)
Your help will be greatly appreciated.
You are getting that exception because UDF will execute on column's data type which is not Row. Consider we have Dataset<Row> ds which has two columns col1 and col2 both are String type. Now if we want to convert the value of col2 to uppercase using UDF.
We can register and call UDF like below.
spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();
Or using withColumn
ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();
And UDF should be like below.
private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
Improving what @abaghel wrote. If you use the following import
import org.apache.spark.sql.functions;
Using withColumn, code should be as follows:
ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With