Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel with and without interruption using Coroutines on Android, including auto-cancellation according to lifecycle?

Background

I'm having problems in migrating from the simple (deprecated) AsyncTask and Executors to Kotlin Coroutines on Android

The problem

I can't find how I can perform the basic things I could have done on AsyncTask and even on Executors using Kotlin Coroutines.

In the past, I could choose to cancel a task with and without thread interruption. Now for some reason, given a task that I create on Coroutines, it's only without interruption, which means that if I run some code that has even "sleep" in it (not always by me), it won't be interrupted.

I also remember I was told somewhere that Coroutines is very nice on Android, as it automatically cancel all tasks if you are in the Activity. I couldn't find an explanation of how to do it though.

What I've tried and found

For the Coroutines task (called Deferred according to what I see) I think I've read that when I create it, I have to choose which cancellation it will support, and that for some reason I can't have them both. Not sure if this is true, but I still wanted to find out, as I want to have both for best migration. Using AsyncTask, I used to add them to a set (and remove when cancelled) so that upon Activity being finished, I could go over all and cancel them all. I even made a nice class to do it for me.

This is what I've created to test this:

class MainActivity : AppCompatActivity() {
    val uiScope = CoroutineScope(Dispatchers.Main)
    val bgDispatcher: CoroutineDispatcher = Dispatchers.IO

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        loadData()
    }

    private fun loadData(): Job = uiScope.launch {
        Log.d("AppLog", "loadData")
        val task = async(bgDispatcher) {
            Log.d("AppLog", "bg start")
            try {
                Thread.sleep(5000L) //this could be any executing of code, including things not editable
            } catch (e: Exception) {
                Log.d("AppLog", "$e")
            }
            Log.d("AppLog", "bg done this.isActive?${this.isActive}")
            return@async 123
        }
        //simulation of cancellation for any reason, sadly without the ability to cancel with interruption
        Handler(mainLooper).postDelayed({
            task.cancel()
        }, 2000L)
        val result: Int = task.await()
        Log.d("AppLog", "got result:$result") // this is called even if you change orientation, which I might not want when done in Activity
    }
}

build gradle file:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"

The questions

  1. Is it possible on Coroutines to have a task that I can cancel with and without thread interruption?
  2. What should be added to make this work, so that when the Activity dies (orientation change, for example), it would be auto-cancelled (with the choice of with or without interruption) ? I guess I could use a similar solution as I had for AsyncTask, but I remember I was told there is a nice way to do it for Coroutines too.
like image 432
android developer Avatar asked Oct 16 '25 01:10

android developer


1 Answers

Coroutines aren't magic. They're implemented using state machines, and may have a number of suspension points. This is all explained in the original Coroutines KEEP.

Cancellation happens on those suspension points. A coroutine cannot be cancelled at any other point than a suspension point (in normal execution at least). If you use Thread.sleep, then there's no suspension points. You should use delay instead of sleep, which introduces a suspension point. If you're doing a long operation, you can add a few yield() to add suspension points and make your coroutine cancellable.

From the docs:

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled

Calling a suspend function automatically introduces a suspension point.

As @CommonsWare pointed out, there's is rarely a reason to create your coroutine scope. In activities and fragments, or any component tied with a lifecycle, you should use lifecycleScope. In ViewModels, there's viewModelScope.

EDIT:

I've tried adapting the source of runInterruptible to not interrupt under a certain condition: passing an instance of a custom exception class InterruptionException as a cancel cause will skip the thread interruption. I've replaced atomicfu constructs with AtomicInteger, I assumed your target was only the JVM. You'll need to opt-in internal coroutines API by adding the -Xopt-in=kotlinx.coroutines.InternalCoroutinesApi compiler flag.

suspend fun <T> runInterruptibleCancellable(
    context: CoroutineContext = EmptyCoroutineContext,
    block: () -> T
): T = withContext(context) {
    try {
        val threadState = ThreadState(coroutineContext.job)
        threadState.setup()
        try {
            block()
        } finally {
            threadState.clearInterrupt()
        }
    } catch (e: InterruptedException) {
        throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
    }
}

private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3

private class ThreadState(private val job: Job) : CompletionHandler {
    /*
       === States ===

       WORKING: running normally
       FINISH: complete normally
       INTERRUPTING: canceled, going to interrupt this thread
       INTERRUPTED: this thread is interrupted

       === Possible Transitions ===

       +----------------+         register job       +-------------------------+
       |    WORKING     |   cancellation listener    |         WORKING         |
       | (thread, null) | -------------------------> | (thread, cancel handle) |
       +----------------+                            +-------------------------+
               |                                                |   |
               | cancel                                  cancel |   | complete
               |                                                |   |
               V                                                |   |
       +---------------+                                        |   |
       | INTERRUPTING  | <--------------------------------------+   |
       +---------------+                                            |
               |                                                    |
               | interrupt                                          |
               |                                                    |
               V                                                    V
       +---------------+                              +-------------------------+
       |  INTERRUPTED  |                              |         FINISHED        |
       +---------------+                              +-------------------------+
    */
    private val _state = AtomicInteger(WORKING)
    private val targetThread = Thread.currentThread()

    // Registered cancellation handler
    private var cancelHandle: DisposableHandle? = null

    fun setup() {
        cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
        // Either we successfully stored it or it was immediately cancelled
        while (true) {
            when (val state = _state.get()) {
                // Happy-path, move forward
                WORKING -> if (_state.compareAndSet(state, WORKING)) return
                // Immediately cancelled, just continue
                INTERRUPTING, INTERRUPTED -> return
                else -> invalidState(state)
            }
        }
    }

    fun clearInterrupt() {
        /*
         * Do not allow to untriggered interrupt to leak
         */
        while (true) {
            when (val state = _state.get()) {
                WORKING -> if (_state.compareAndSet(state, FINISHED)) {
                    cancelHandle?.dispose()
                    return
                }
                INTERRUPTING -> {
                    /*
                     * Spin, cancellation mechanism is interrupting our thread right now
                     * and we have to wait it and then clear interrupt status
                     */
                }
                INTERRUPTED -> {
                    // Clear it and bail out
                    Thread.interrupted()
                    return
                }
                else -> invalidState(state)
            }
        }
    }

    // Cancellation handler
    override fun invoke(cause: Throwable?) {
        if (cause is InterruptionException) {
            while (true) {
                when (val state = _state.get()) {
                    // Working -> try to transite state and interrupt the thread
                    WORKING -> {
                        if (_state.compareAndSet(state, INTERRUPTING)) {
                            targetThread.interrupt()
                            _state.set(INTERRUPTED)
                            return
                        }
                    }
                    // Finished -- runInterruptible is already complete, INTERRUPTING - ignore
                    FINISHED, INTERRUPTING, INTERRUPTED -> return
                    else -> invalidState(state)
                }
            }
        }
    }

    private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}

class InterruptionException(cause: Throwable?) : CancellationException() {
    init {
        initCause(cause)
    }
}

fun Job.interrupt(cause: Throwable? = null) {
    this.cancel(InterruptionException(cause))
}

suspend fun Job.interruptAndJoin() {
    interrupt()
    return join()
}

You can use the interrupt and interruptAndJoin extension functions to trigger thread interruption, otherwise use cancel for non-interrupting cancel. An example:

val scope = CoroutineScope(Dispatchers.IO)
val job = scope.launch {
    runInterruptibleCancellable {
        // some blocking code
        Thread.sleep(1000)
        if (!isActive) {
            println("cancelled")
        } else {
            println("completed")
        }
    }
}
job.invokeOnCompletion {
    if (it is InterruptionException) {
        print("interrupted")
    }
}

runBlocking {
//  job.interruptAndJoin()  // prints "interrupted"
//  job.cancelAndJoin()     // prints "cancelled"
    job.join()              // prints "completed"
}

This example is the only testing I've made. It seems to work. I don't know if it leaks, I don't know if it's thread-safe. I'm really far out of my expertise to be honest. Please don't use it in production without further confirmation that it works.

like image 176
Nicolas Avatar answered Oct 17 '25 13:10

Nicolas