Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python asyncio bidirectional communications hardware control

I'm building some hardware on a raspberry pi where I have a number of sensors and a bunch of actuators. I would like to use Asyncio coroutines to continually monitor multiple sensors (essentially so I don't have to poll from a main loop) and then do something wth a bunch of actuators.

I intend to have a class per sensor, which will have methods like the coroutine in the code below.

I would like to yield a result from a sensor method to some variable which I can then act on.

My question is, if I have multiple coroutines writing to a single place how do I do that safely. The queues in asyncio seem to be one to one, not many to one - is that correct?

Ultimately I dont understand how to have multiple coroutines return to one place, have some logic and then send messages to other coroutines

+------------+
|            |                                +------------+
|  Sensor 1  +-------+                        |            |
|            |       |                    +--->  actuator1 |
+------------+       |                    |   |            |
                     |                    |   +------------+
+------------+       |      +-----------+ |
|            |       |      |           | |
|   Sensor 2 +------------> |  logic    +-+
|            |       |      |           | |
+------------+       |      +-----------+ |
                     |                    |   +------------+
+------------+       |                    |   |            |
|            |       |                    +--->  actuator2 |
|  Sensor 3  +-------+                        |            |
|            |                                +------------+
+------------+

The above represents what I am trying to achieve. I know I could achieve this with polling and a while loop, but I liked the idea of trying asyncio/ event driven approach.

import asyncio
import random

async def sensor(queue):
    while True:
        # Get some sensor data
        sensor_data = "data"

        await queue.put(sensor_data)


async def actuator(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())

loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
sensor_coro = sensor(queue)
actuator_coro = actuator(queue)
loop.run_until_complete(asyncio.gather(sensor_coro, actuator_coro))
loop.close()
like image 995
gadgetlab Avatar asked Sep 05 '25 03:09

gadgetlab


1 Answers

My question is, if I have multiple coroutines writing to a single place how do I do that safely. The queues in asyncio seem to be one to one, not many to one - is that correct?

That is not correct; asyncio queues are multiple-producer multiple-consumer. To implement the logic of your diagram, you need two synchronization primitives:

  • a queue, filled by multiple instances of the sensor coroutine and drained by a single instance of the logic() coroutine

  • an additional synchronization device per each actuator. Which kind of device is best here will depend on the requirements. For example, are actuators allowed to "lose" messages that come in faster than they are able to respond? Or, is the logic supposed to wait up? Depending on those, the synchronization between logic() and each actuator will be either a simple Event (or even just a Future) or another queue.

Assuming you use a queue per actuator, your logic coroutine might look like this:

async def logic(sensor_queue, actuator_queues):
    while True:
        item = await queue.get()
        # process the item and signal some actuators
        await actuator_queues[0].put(x1)
        await actuator_queues[1].put(x2)
like image 183
user4815162342 Avatar answered Sep 07 '25 20:09

user4815162342