Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AIORedis and PUB/SUB aren't asnyc

I used aioredis for writing async service which will listen on a certain channel and run some commands in async manner.

Basically I took a code from examples page to write a small test-app and removed unnecessary parts:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Also there is another test-app, which publishes json blobs with a sleep_for field which then used in a subscriber app to emulate some work inside a reader coroutine using a sleep statement.

I expected "sleeps" to run in "parallel" but in fact they appear in a sync manner on the screen, just one after the other.

My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message. On practice it runs like a synchronous code. Where am I wrong? This could be handled using connections pools, but this means that there is something not async and have no idea what exactly.

like image 311
Glueon Avatar asked Oct 20 '25 14:10

Glueon


1 Answers

My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message.

That's not how the async/await syntax works. Every time you hit a await in a coroutine, that coroutine will be "paused", giving control over to the called coroutine. It doesn't automatically process the next message if it's sleeping.

What you should do is use ensure_future to handle each message in a separate coroutine:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 
like image 89
Jashandeep Sohi Avatar answered Oct 23 '25 09:10

Jashandeep Sohi



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!