Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkSQL Pushdown Filtering not Working in Spark Cassandra Connector

I have a table schema as

appname text,
randomnum int,
addedtime timestamp,
shortuuid text,
assetname text,
brandname text,

PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)

addedtime is clustering key

Now when I am using pushdown filter on clustering key addedtime, I do not see it getting applied

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain
== Physical Plan ==
Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)

According to the docs, it should get applied https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples

Also it was working in spark cassandra connector 1.4 but not with the latest one cassandra connector 1.6.0-M1. Please let me know the issue

like image 692
Nipun Avatar asked Dec 14 '25 15:12

Nipun


1 Answers

Problem analysis

The issue seems to be the way Catalyst is processing the comparison.

When doing

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain

It is casting the the addedTime column to a String then doing the comparison. Catalyst is not presenting this predicate to the the Spark Cassandra Connector so there is no way to push it.

INFO  2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: []
Filter (cast(addedtime#2 as string) > 2015-08-03)

This is also wrong because it's doing a string comparison (which lexically will work here but isn't really what you want to do) So this looks like a bug in Catalyst since we should probably present the predicate to the source even if there is a "cast". There is a workaround though which involves giving the Catalyst optimizer what it wants to see.

Workaround

If instead we give a type hint

df.filter("addedtime > cast('2015-08-03' as timestamp)").explain

Then Spark will generate the correct comparison without the string Cast

DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied:
C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
Spark Filters []

== Physical Plan ==
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
like image 119
RussS Avatar answered Dec 16 '25 21:12

RussS



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!