I have an infinite loop that is capturing video from a camera. Every so often takes an image and processes it to find faces. This process could be running as a background task given that the main process does not need a response from it. So, is there a way to make a queue of asynchronous tasks without the need of stop the main loop for any reason?
In other words. I need to create a queue of tasks that are running concurrently while the main loop is running to and this one could add more tasks to the queue.
I've tried to do it with asyncio, but it does not work. I expected an out put like:
Main loop
Main loop
Main loop
hello
Main loop
hello
...
But I got:
Main loop
hello
Main loop
hello
Main loop
hello
Main loop
hello
...
What does it mean they run one at a time.
import os
import cv2
import numpy as np
import asyncio
import time
async def process_frame(frame: np.array) -> None:
await asyncio.sleep(2)
print('hello')
# Process that could take a lot of time
# proces the image in a db and if a face from the db is recognized in the image
# this function will send a request to an web service
# The main process does not have to await for a response
async def main():
video_capture = cv2.VideoCapture(0) # Initialize camera
frame_rate = 30
prev = 0
while video_capture.isOpened():
time_elapsed = time.time() - prev
_, frame = video_capture.read() # Getting frame from opencv (cv2)
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
print('Main loop')
if time_elapsed > 1./frame_rate:
prev = time.time()
task = asyncio.create_task(process_frame(frame)) # This function needs to be sent to a async queue
await task
video_capture.release()
if __name__ == '__main__':
asyncio.run(main())
The immediate problem is that you await
the task immediately after creating it. That will suspend the current coroutine until task finishes, which is what you don't want.
However, when you remove the await
, you face the more serious problem - that you are calling blocking functions, such as video_capture.read()
, from main
which is supposed to be async. That will block the event loop and won't allow the background tasks to proceed.
To fix the issue, you can completely decouple your sync code from the asyncio code, by running the asyncio event loop in a separate thread, and using asyncio.run_coroutine_threadsafe
to submit coroutines to it. For example (untested):
import os, cv2, numpy as np, asyncio, time
async def process_frame(frame):
await asyncio.sleep(2)
print('hello')
# ...
_loop = asyncio.get_event_loop()
threading.Thread(target=_loop.run_forever, daemon=True).start()
def main():
video_capture = cv2.VideoCapture(0) # Initialize camera
frame_rate = 30
prev = 0
while video_capture.isOpened():
time_elapsed = time.time() - prev
_, frame = video_capture.read()
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
print('Main loop')
if time_elapsed > 1./frame_rate:
prev = time.time()
asyncio.run_coroutine_threadsafe(process_frame(frame), _loop)
video_capture.release()
if __name__ == '__main__':
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With