I'm building some hardware on a raspberry pi where I have a number of sensors and a bunch of actuators. I would like to use Asyncio coroutines to continually monitor multiple sensors (essentially so I don't have to poll from a main loop) and then do something wth a bunch of actuators.
I intend to have a class per sensor, which will have methods like the coroutine in the code below.
I would like to yield a result from a sensor method to some variable which I can then act on.
My question is, if I have multiple coroutines writing to a single place how do I do that safely. The queues in asyncio seem to be one to one, not many to one - is that correct?
Ultimately I dont understand how to have multiple coroutines return to one place, have some logic and then send messages to other coroutines
+------------+
| | +------------+
| Sensor 1 +-------+ | |
| | | +---> actuator1 |
+------------+ | | | |
| | +------------+
+------------+ | +-----------+ |
| | | | | |
| Sensor 2 +------------> | logic +-+
| | | | | |
+------------+ | +-----------+ |
| | +------------+
+------------+ | | | |
| | | +---> actuator2 |
| Sensor 3 +-------+ | |
| | +------------+
+------------+
The above represents what I am trying to achieve. I know I could achieve this with polling and a while loop, but I liked the idea of trying asyncio/ event driven approach.
import asyncio
import random
async def sensor(queue):
while True:
# Get some sensor data
sensor_data = "data"
await queue.put(sensor_data)
async def actuator(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
sensor_coro = sensor(queue)
actuator_coro = actuator(queue)
loop.run_until_complete(asyncio.gather(sensor_coro, actuator_coro))
loop.close()
My question is, if I have multiple coroutines writing to a single place how do I do that safely. The queues in asyncio seem to be one to one, not many to one - is that correct?
That is not correct; asyncio queues are multiple-producer multiple-consumer. To implement the logic of your diagram, you need two synchronization primitives:
a queue, filled by multiple instances of the sensor
coroutine and drained by a single instance of the logic()
coroutine
an additional synchronization device per each actuator. Which kind of device is best here will depend on the requirements. For example, are actuators allowed to "lose" messages that come in faster than they are able to respond? Or, is the logic
supposed to wait up? Depending on those, the synchronization between logic()
and each actuator will be either a simple Event
(or even just a Future
) or another queue.
Assuming you use a queue per actuator, your logic
coroutine might look like this:
async def logic(sensor_queue, actuator_queues):
while True:
item = await queue.get()
# process the item and signal some actuators
await actuator_queues[0].put(x1)
await actuator_queues[1].put(x2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With