I am writing a client-server application. While connected, client sends to the server a "heartbeat" signal, for example, every second. On the server-side I need a mechanism where I can add tasks (or coroutines or something else) to be executed asynchronously. Moreover, I want to cancel tasks from a client, when it stops sending that "heartbeat" signal.
In other words, when the server starts a task it has kind of timeout or ttl, in example 3 seconds. When the server receives the "heartbeat" signal it resets timer for another 3 seconds until task is done or client disconnected (stops send the signal).
Here is an example of canceling a task from asyncio tutorial on pymotw.com. But here the task is canceled before the event_loop started, which is not suitable for me.
import asyncio
async def task_func():
    print('in task_func')
    return 'the result'
event_loop = asyncio.get_event_loop()
try:
    print('creating task')
    task = event_loop.create_task(task_func())
    print('canceling task')
    task.cancel()
    print('entering event loop')
    event_loop.run_until_complete(task)
    print('task: {!r}'.format(task))
except asyncio.CancelledError:
    print('caught error from cancelled task')
else:
    print('task result: {!r}'.format(task.result()))
finally:
    event_loop.close()
You can use asyncio Task wrappers to execute a task via the ensure_future() method.
ensure_future will automatically wrap your coroutine in a Task wrapper and attach it to your event loop. The Task wrapper will then also ensure that the coroutine 'cranks-over' from await to await statement (or until the coroutine finishes).
In other words, just pass a regular coroutine to ensure_future and assign the resultant Task object to a variable. You can then call Task.cancel() when you need to stop it.
import asyncio
async def task_func():
    print('in task_func')
    # if the task needs to run for a while you'll need an await statement
    # to provide a pause point so that other coroutines can run in the mean time
    await some_db_or_long_running_background_coroutine()
    # or if this is a once-off thing, then return the result,
    # but then you don't really need a Task wrapper...
    # return 'the result'
async def my_app():
    my_task = None
    while True:
        await asyncio.sleep(0)
        # listen for trigger / heartbeat
        if heartbeat and my_task is None:
            my_task = asyncio.ensure_future(task_func())
        # also listen for termination of hearbeat / connection
        elif not heartbeat and my_task:
            if not my_task.cancelled():
                my_task.cancel()
            else:
                my_task = None
run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()
Note that tasks are meant for long-running tasks that need to keep working in the background without interrupting the main flow. If all you need is a quick once-off method, then just call the function directly instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With