I want to create a REST service using FastAPI and aio-pika while working asynchronously. For other async database drivers, I could create clients on startup, when get them in route handlers. For example, with motor I would declare simple connection manager:
from motor.motor_asyncio import AsyncIOMotorClient
class Database:
client: AsyncIOMotorClient = None
db = Database()
async def connect_to_mongo():
db.client = AsyncIOMotorClient("mongo:27017")
async def close_mongo_connection():
db.client.close()
async def get_mongo_client() -> AsyncIOMotorClient:
return db.client
Then add couple of handlers:
app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("shutdown", close_mongo_connection)
and then just use get_mongo_client to get one to my handler.
Problem here is that aio-pika needs asyncio loop to function. Here is an example from the docs:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
And with FastAPI I don't have asyncio loop. Are there any way to use it with interface like in example? Can I just create new loop using asyncio.get_event_loop() and pass it to the connect_robust without really using it anywhere? Like this:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/", loop=asyncio.get_event_loop()
)
Ok, so, according to the docs, I can just use connect instead of connect_robust:
connection = await aio_pika.connect(
"amqp://guest:[email protected]/"
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With