I am trying to read a CSV file (which is supposed to have a header) in Spark and load the data into an existing table (with predefined columns and datatypes). The csv file can be very large, so it would be great if I could avoid doing it if the columns header from the csv is not "valid".
When I'm currently reading the file, I'm specyfing a StructType as the schema, but this does not validate that the header contains the right columns in the right order. This is what I have so far (I'm building the "schema" StructType in another place):
sqlContext
.read()
.format("csv")
.schema(schema)
.load("pathToFile");
If I add the .option("header", "true)" line it will skill over the first line of the csv file and use the names I'm passing in the StructType's add method. (e.g. if I build the StructType with "id" and "name" and the first row in the csv is "idzzz,name", the resulting dataframe will have columns "id" and "name". I want to be able to validate that the csv header has the same name for columns as the table I'm planning on loading the csv.
I tried reading the file with .head(), and doing some checks on that first row, but that downloads the whole file.
Any suggestion is more than welcomed.
From what I understand, you want to validate the schema of the CSV you read. The problem with the schema option is that its goal is to tell spark that it is the schema of your data, and not to check that it is.
There is an option however that infers the said schema when reading a CSV and that could be very useful (inferSchema) in your situation. Then, you can either compare that schema with the one you expect with equals, or do the small workaround that I will introduce to be a little bit more permissive.
Let's see how it works the following file:
a,b
1,abcd
2,efgh
Then, let's read the data. I used the scala REPL but you should be able to convert all that in Java very easily.
val df = spark.read
.option("header", true) // reading the header
.option("inferSchema", true) // infering the sschema
.csv(".../file.csv")
// then let's define the schema you would expect
val schema = StructType(Array(StructField("a", IntegerType),
StructField("b", StringType)))
// And we can check that the schema spark inferred is the same as the one
// we expect:
schema.equals(df.schema)
// res14: Boolean = true
going further
That's in a perfect world. Indeed, if you schema contains non nullable columns for instance or other small differences, this solution that's based on strict equality of object will not work.
val schema2 = StructType(Array(StructField("a", IntegerType, false),
StructField("b", StringType, true)))
// the first column is non nullable, it does not work because all the columns
// are nullable when inferred by spark:
schema2.equals(df.schema)
// res15: Boolean = false
In that case you may need to implement a schema comparison method that would suit you like:
def equalSchemas(s1 : StructType, s2 : StructType) = {
s1.indices
.map(i => s1(i).name.toUpperCase.equals(s2(i).name.toUpperCase) &&
s1(i).dataType.equals(s2(i).dataType))
.reduce(_ && _)
}
equalSchemas(schema2, df.schema)
// res23: Boolean = true
I am checking that the names and the types of the columns are matching and that the order is the same. You could need to implement a different logic depending on what you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With