How do i set up a python service that (asynchronously) watches change streams of a mongodb.
All i can find on mongodb.com and pymongo docs are the following two approaches, which do not really seem production ready:
Approach mongodb.com:
import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('<YOUR-MONGO-CONNECT-STRING>')
change_stream = client.changestream.collection.watch()
for change in change_stream:
print(dumps(change))
Approach pymongo docs:
with db.collection.watch() as stream:
while stream.alive:
change = stream.try_next()
print("Current resume token: %r" % (stream.resume_token,))
if change is not None:
print("Change document: %r" % (change,))
continue
time.sleep(10)
I thought about a solution using the watch function as callback for an event loop. Does anybody know an implementation for this?
A simple approach in order to get notifications for all MongoDB updates is the following:
import pymongo
client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
mongo_user, mongo_pass, mongo_db_name))
option={ 'full_document':'updateLookup' }
change_stream = client.mongo_db_name.mongo_db_collection.watch([{"$match" : { "operationType" : "update" }}], **option)
for change in change_stream:
print(dumps(change))
print('')
You will need to replace
Also, you can add a resume token in order to handle the cases you will e.g. have a network error. With a resume token, the stream will connect again and you will receive notifications from the point you lost the connection.
import pymongo
from pymongo import errors
client = pymongo.MongoClient("mongodb+srv://YOUR-MONGO-CONNECT-STRING" % (
mongo_user, mongo_pass, mongo_db_name))
try:
resume_token = None
pipeline = [{'$match': {'operationType': 'update'}}]
with client.mongo_db_name.mongo_db_collection.watch(pipeline) as stream:
for update_change in stream:
print(update_change)
resume_token = stream.resume_token
except pymongo.errors.PyMongoError:
if resume_token is None:
logging.error('...')
else:
with client.mongo_db_name.mongo_db_collection.watch(pipeline, resume_after=resume_token) as stream:
for update_change in stream:
print(update_change)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With