I am trying to learn how to use AsyncIO in Python 3.7 and I am still a little confused by its principles.
My goal is to write a simple chat program, however I need to use a ring network topology -- one node only knows about its two neighbours. When the message is sent, it is passed by the nodes until it reaches the sender again. This means that each node is basically a client and a server at the same time.
I also need to be able to detect dead nodes, so that my ring does not break.
I thought it might be a good solution for each node to have a separate connection for every neighbour -- successor and predecessor.
class Node:
    ...
    def run():
        ...
        s = loop.create_connection(lambda: Client(...), addr1, port1)
        p = loop.create_server(lambda: Server(...), addr2, port2)
        successor = loop.run_until_complete(s)
        predecessor = loop.run_until_complete(p)
        loop.run_forever()
        ...
    ...
Server and Client are classes that implement asyncio.Protocol.
The reason I wanted to do it this way is, that if there is a message being sent through the circle, it is always sent from the predecessor to the successor. In connection_lost method of the predecessor I can detect that it is disconnected and send its predecessor a message (through the whole ring) to connect to me. 
I would like to be able to send a message that I received from my predecessor further on to my successor. I would also like to be able to send a message with my address to my successor in case my predecessor dies (this message would be sent from predecessor's Server.connection_lost() and would be passed all the way to my dead predecessor's predecessor).
My question is: Can I pass the received data from predecessor to successor? If not, what would be a better implementation of this program that uses AsyncIO and the ring topology?
For anyone new to AsyncIO having the same problem, I found the solution myself.
First of all, it is better to use the high-level aspects of AsyncIO -- streams. Calling loop.create_connction and loop.create_server is considered low-level (which I understood wrong at first). 
The high-level alternative to create_connection is asyncio.open_connection, which will supply you with a tuple consisting of asyncio.StreamReader and asyncio.StreamWriter which you can use to read from and write to the open connection. You can also detect the loss of the connection when the data read from the StreamReader equals to b'' or when you catch an exception (ConnectionError) while trying to write to the StreamWriter.
The high-level alternative to create_server is asyncio.start_server, which needs to be supplied a callback function that will be called every time a connection to the server is made (open connection, received data...). The callback has StreamReader and StreamWriter as arguments. The loss of the connection can be also detected by receiving b'' or ConnectionError on writing to the writer.
Multiple connections can be handled by coroutines. There can be a coroutine for the server part (which accepts the connection from one of the neighbors in the ring topology) and a coroutine for the client part (which opens a connection to the other neighbor in the ring). The Node class can look like this:
import asyncio
class Node:
    ...
    async def run(self):
        ...
        self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
        server_coro = asyncio.create_task(self.server_init())
        client_coro = asyncio.create_task(self.client_method())
        await client_coro
        await server_coro
        ...
    async def server_init(self):
        server = await asyncio.start_server(self.server_callback, self.IP, self.port)
        async with server:
            await server.serve_forever()
    async def client_method(self):
        ...
        try:
            data = await self.next_reader.read()
        except ConnectionError:
            ...
    ...
Note that I am using asyncio.create_task for the coroutines and (not here in the code listing) asyncio.run(node.run()), which are considered high-level alternatives of asyncio.ensure_future() and loop.run_forever(). Both of these were added in Python 3.7 and asyncio.run() is said to be provisional, so by the time you read this, is might already have been replaced by something else.
I'm not an AsyncIO expert, so there might be a better, cleaner way to do this (if you know it, please share it).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With