I'm currently doing my first steps with asyncio in Python 3.5 and there is one problem that's bugging me. Obviously I haven't fully understood coroutines...
Here is a simplified version of what I'm doing.
In my class I have an open() method that creates a new thread. Within that thread I create a new event loop and a socket connection to some host. Then I let the loop run forever.
def open(self):
    # create thread
    self.thread = threading.Thread(target=self._thread)
    self.thread.start()
    # wait for connection
    while self.protocol is None:
        time.sleep(0.1)
def _thread(self):
    # create loop, connection and run forever
    self.loop = asyncio.new_event_loop()
    coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
                                       'somehost.com', 1234)
    self.loop.run_until_complete(coro)
    self.loop.run_forever()
Stopping the connection is now quite simple, I just stop the loop from the main thread:
loop.call_soon_threadsafe(loop.stop)
Unfortunately I need to do some cleanup, especially I need to empty a queue before disconnecting from the server. So I tried something like this stop() method in MyProtocol:
class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self._loop = loop
        self._queue = []
    async def stop(self):
        # wait for all queues to empty
        while self._queue:
            await asyncio.sleep(0.1)
        # disconnect
        self.close()
        self._loop.stop()
The queue gets emptied from within the protocol's data_received() method, so I just want to wait for that to happen using the while loop with the asyncio.sleep() call. Afterwards I close the connection and stop the loop.
But how do I call this method from the main thread and wait for it? I tried the following, but none of them seem to work (protocol is the currently used instance of MyProtocol):
loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)
Can anyone please help me here? Thanks!
Basically you want to schedule coroutine on loop of different thread. You could use run_coroutine_threadsafe:
future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result()  # wait for results
Or the old style async like in https://stackoverflow.com/a/32084907/681044
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()    
@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')
loop.call_soon_threadsafe(asyncio.async, g())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With