Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait on a error/close event for a socket with asyncio?

I am using a networking library who provides a wrapper for using its coroutine functions with asyncio. When I wrote a test that randomly closes the connection (to see if my program is resilient under bad conditions), I found that it hangs indefinitely.

It seemed like a bug in the wrapper provided by the library I was using, because the program hangs waiting on the callback from either loop.add_reader() or loop.add_writer(), but then I could not find how to be notified when the socket is closed.

This is a minimal program that shows what is happening with my program:

import asyncio
import socket

async def kill_later(c):
    await asyncio.sleep(0.1)
    c.close()

async def main():
    loop = asyncio.get_running_loop()

    c = socket.create_connection(('www.google.com', 80))
    c.setblocking(0)

    ev = asyncio.Event()
    loop.add_reader(c, ev.set)

    # Closes the socket after 0.1 ms:
    asyncio.create_task(kill_later(c))

    print("waiting...")

    #### ↓ THIS WAITS FOREVER ↓ ####
    await ev.wait()

asyncio.run(main())

My question: how to be notified a socket is closed by the asyncio loop?

EDIT: due to popular demand, made the socket non-blocking, but it makes no difference, because add_reader() doesn't try to perform any IO on the socket, merely watches for when it is ready.

like image 848
lvella Avatar asked Sep 15 '25 03:09

lvella


1 Answers

Your test program is flawed. A call to c.close() doesn't emulate the socket being closed by the other end, it closes your own file descriptor and makes it inaccessible. You can think of close(fd) as breaking the link between the number fd and the underlying OS resource. After that reading and polling fd becomes meaningless because the number no longer refers to anything. As a result, epoll() can't and won't report a closed file descriptor as "readable".

The way to test the condition you want to test is by having the other end close the connection. The easiest way to do that is by spawning another process or thread as a mock server. For example:

import asyncio, threading, socket, time

def start_mock_server():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(('localhost', 10000))
    s.listen(1)
    def serve():
        conn, addr = s.accept()
        time.sleep(1)
        conn.close()
        s.close()
    threading.Thread(target=serve).start()

async def main():
    loop = asyncio.get_running_loop()
    start_mock_server()

    c = socket.create_connection(('localhost', 10000))
    c.setblocking(0)

    ev = asyncio.Event()
    loop.add_reader(c.fileno(), ev.set)

    print("waiting...")
    await ev.wait()
    print("done")

asyncio.run(main())
like image 162
user4815162342 Avatar answered Sep 17 '25 17:09

user4815162342