Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python asyncio queue use get/put simultaneously

I'd like to run a Python script that can simulate real-world time-series data and process the data in real-time. That is to say, from a given dataset, (which is ~8hr long measurement), I want to get one-second-long data every second, and process as soon as each second of data is read. In the real world, the data will be gathered every second using some detectors. For this task, I've decided to use Python asyncio module. Below is basically what I came up with.

import scipy
import numpy as np
import asyncio
import time
import queue

q = queue.Queue()

async def get_data (data):

    while True:
        await asyncio.sleep(1)
        q.put(data[idx,:])
        idx += 1
        #Each row of data is read every second. 

async def run_algorithm ():

    while True:

        if q.empty() == True:
            await asyncio.sleep(1)

        data_read = q.get(block = False)

        #I do something here with the read data


async def main (data):

    feed_data = asyncio.create_task(get_data(data))
    process_data = asyncio.create_task(run_algorithm ())
    await asyncio.gather(feed_data, process_data)

While it seems to work fine, the problem is that #I do something here with the read data part. That is where I use my algorithm to process the data in real-time. When I use an algorithm that is fast, it works pretty well. It reads the data every second as it should, and the run_algorithm function waits for a second when there is no data in the queue.

However, when I have a slow algorithm, it does not read the data every second. If the algorithm takes 0.5 seconds to run, then the next data is read in 1.5 seconds, instead of 1 second.

It is like the get_data function slows down as run_algorithm part slows down. Is there a way to get that get_data read the data every second no matter how long the run_algorithm part takes?

like image 270
jwonlee Avatar asked Oct 21 '25 09:10

jwonlee


1 Answers

As indicated in the comments, you need to use an asyncio queue, in which case you don't need the sleeps in run_algorithm:

q = asyncio.Queue()

async def get_data (data):
    while True:
        await asyncio.sleep(1)
        await q.put(data[idx,:])
        idx += 1
        #Each row of data is read every second. 

async def run_algorithm ():
    while True:
        data_read = await q.get()
        #I do something here with the read data

However, when I have a slow algorithm, it does not read the data every second

It sounds like your algorithm contains CPU-bound or otherwise blocking code. Since asyncio is single-threaded, this causes the event loop to stall. To fix it, you should run blocking code in a separate thread. Asyncio contains a utility for this purpose, run_in_executor. For example, emulating the blocking/CPU-bound call with time.sleep(1) (intentionally not using asyncio.sleep here to emulate blocking), here is how you would call it from asyncio:

        #time.sleep(1)   # this is forbidden
        # instead, invoke blocking code like this:
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, time.sleep, 1)
like image 141
user4815162342 Avatar answered Oct 22 '25 23:10

user4815162342



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!