I am using PySpark with MongoDB and would like to query my database using a pipeline with a filter of date. In Mongo my query looks like that :
db.collection.aggregate([{$match:{"creation":{$lte:new Date("Jan 1, 2016")}}},{$sort:{"creation":1}}])
But I don't know how to do the same thing in Python. For example I tried :
pipeline = [{'$match': {'creation': {'$lte': datetime.datetime(2016, 1, 1, 0, 0)}}}, {'$sort': {'creation': 1}}]
df = context.read.format("com.mongodb.spark.sql").options(pipeline=pipeline).load()
and I got an error : org.bson.json.JsonParseException: JSON reader was expecting a value but found 'datetime'.
(I want to do everything in the pipeline and not in a SQL query)
You can utilise MongoDB extended JSON to specify the date. For example:
pipeline = [{'$match':{'creation':{'$lte': {'$date': "2016-01-01T00:00:00Z" }}}}]
df_pipeline = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("pipeline", pipeline).load()
df_pipeline.first()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With