Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

async file read with AIOfile

I'm trying to read several files (CSV) with asyncio but I don't want to block the main event loop while doing that.

So I checked AIOfile which seems to promise that reading is not blocking. While this might be true, the following snippet takes a huge amount of time to complete, it's basically the same example from here https://github.com/mosquito/aiofile#read-file-line-by-line

import asyncio
from aiofile import AIOFile, LineReader
from pathlib import Path
import time

counter = 0

async def main():
    path = 'test_data'
    global counter
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = [(path + '/' + file.name, file) for file in files_in_basepath]
    for file in list_of_files:
        line_count = 0
        async with AIOFile(file[0]) as afp:
            await afp.fsync()
            async for line in LineReader(afp):
                #print(line)
                values = ''
                line_values = line.split(',')
                for item in line_values:
                    values = values + item + ' '
                # print(values)
                line_count += 1
        print(f'Processed {line_count} lines in file {file[1].name}.')
        counter += 1

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
duration = time.time() - start_time
print(f"Processed {counter} data files in {duration} seconds")

This gives terrible performance, 100 files takes:

Processed 100 data files in 196.8809883594513 seconds

Compared with a sequential processing of those files, it's just incredible...

Processed 100 data files in 0.9933180809020996 second

So I was wondering what's happening here and also, I've seen in several places recommendation to run IO operations in executor so event loop is not blocked.

Just to mention, I have some other code that runs this on threads and performs almost as good as sequential:

import concurrent.futures
import csv
import threading
import time
from pathlib import Path

c_lock = threading.Lock()
counter = 0

def read_data_file(files):
    # Get the info from second item from tuple
    info = files[1].stat()
    global c_lock
    global counter
    c_lock.acquire()
    print(info.st_mtime)
    print(f'File name is {files[1].name} with size {round(info.st_size / float(1 << 10), 2)} KB')
    with open(files[0]) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:
            # Just assume we do something very interesting with these values...
            values = ''
            for item in row:
                values = values + item + ' '
            #print(values)
            line_count += 1
        print(f'Processed {line_count} lines in file {files[1].name}.')
    counter += 1
    c_lock.release()

def read_data_files(path):
    # List all files in data folder
    data_dir = Path(path)
    files_in_basepath = (entry for entry in data_dir.iterdir() if entry.is_file())
    list_of_files = []
    for file in files_in_basepath:
        list_of_files.append((path + '/' + file.name, file))
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        executor.map(read_data_file, list_of_files)


if __name__ == "__main__":
    data_files = 'test_data'
    start_time = time.time()
    read_data_files(data_files)
    duration = time.time() - start_time
    print(f"Processed {counter} data files in {duration} seconds")

This gives the following:

Processed 100 data files in 1.0079402923583984 seconds

Wondering if I'm doing something wrong with asyncio or I should skipt it altogether...I'm just trying what is the most efficient way to process all these files, sequential, threaded (including asyncio) or multiprocessing)

like image 888
AlejandroVK Avatar asked Sep 02 '25 10:09

AlejandroVK


1 Answers

Your multi-threaded code locks all of read_data_file with a giant lock, forcing it to be executed sequentially and resulting in the threaded version performing no better than the sequential one.

The asyncio version also runs sequentially due to the code not using asyncio.gather or similar to parallelize it. As for why it's 200x slower than the regular sequential version, that might be a good question to ask the aiofiles devs. I suspect that each line-reading operation is separately handed off to an internal thread, slowing it down due to the imense bookkeeping in such a hot loop.

In summary:

  • if your bottle-neck is speed of IO, you might gain something by using multiple threads, as long as you take care not to make things sequential due to unnecessary locking. (The GIL won't be a problem because it is automatically released around IO operations.)

  • if your bottle-neck is speed of CPU, you probably want to investigate multi-processing, as multiple threads won't help due to the GIL. For example, when reading CSV files, the time it takes to parse the file contents and convert it to numbers might dwarf the time it takes to read it from disk, especially if the files are cached in memory by the OS.

  • asyncio and aiofiles most likely won't help you with speed of processing CSV files. aiofiles is most useful when integrating reading of files that might get "stuck" (e.g. because they might be reading from a network drive that's no longer there). In the current implementation it's not useful for reading files where a high throughput is required.

TL;DR try to get the speed-up using threads correctly, and if that doesn't work, using multiprocessing.

like image 91
user4815162342 Avatar answered Sep 04 '25 03:09

user4815162342