I'm having problems in migrating from the simple (deprecated) AsyncTask
and Executors
to Kotlin Coroutines
on Android
I can't find how I can perform the basic things I could have done on AsyncTask
and even on Executors
using Kotlin Coroutines
.
In the past, I could choose to cancel a task with and without thread interruption. Now for some reason, given a task that I create on Coroutines, it's only without interruption, which means that if I run some code that has even "sleep" in it (not always by me), it won't be interrupted.
I also remember I was told somewhere that Coroutines is very nice on Android, as it automatically cancel all tasks if you are in the Activity. I couldn't find an explanation of how to do it though.
For the Coroutines task (called Deferred
according to what I see) I think I've read that when I create it, I have to choose which cancellation it will support, and that for some reason I can't have them both. Not sure if this is true, but I still wanted to find out, as I want to have both for best migration. Using AsyncTask, I used to add them to a set (and remove when cancelled) so that upon Activity being finished, I could go over all and cancel them all. I even made a nice class to do it for me.
This is what I've created to test this:
class MainActivity : AppCompatActivity() {
val uiScope = CoroutineScope(Dispatchers.Main)
val bgDispatcher: CoroutineDispatcher = Dispatchers.IO
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
loadData()
}
private fun loadData(): Job = uiScope.launch {
Log.d("AppLog", "loadData")
val task = async(bgDispatcher) {
Log.d("AppLog", "bg start")
try {
Thread.sleep(5000L) //this could be any executing of code, including things not editable
} catch (e: Exception) {
Log.d("AppLog", "$e")
}
Log.d("AppLog", "bg done this.isActive?${this.isActive}")
return@async 123
}
//simulation of cancellation for any reason, sadly without the ability to cancel with interruption
Handler(mainLooper).postDelayed({
task.cancel()
}, 2000L)
val result: Int = task.await()
Log.d("AppLog", "got result:$result") // this is called even if you change orientation, which I might not want when done in Activity
}
}
build gradle file:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"
Coroutines aren't magic. They're implemented using state machines, and may have a number of suspension points. This is all explained in the original Coroutines KEEP.
Cancellation happens on those suspension points. A coroutine cannot be cancelled at any other point than a suspension point (in normal execution at least). If you use Thread.sleep
, then there's no suspension points. You should use delay
instead of sleep
, which introduces a suspension point. If you're doing a long operation, you can add a few yield()
to add suspension points and make your coroutine cancellable.
From the docs:
Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled
Calling a suspend
function automatically introduces a suspension point.
As @CommonsWare pointed out, there's is rarely a reason to create your coroutine scope. In activities and fragments, or any component tied with a lifecycle, you should use lifecycleScope
. In ViewModels, there's viewModelScope
.
EDIT:
I've tried adapting the source of runInterruptible
to not interrupt under a certain condition: passing an instance of a custom exception class InterruptionException
as a cancel cause will skip the thread interruption. I've replaced atomicfu constructs with AtomicInteger
, I assumed your target was only the JVM. You'll need to opt-in internal coroutines API by adding the -Xopt-in=kotlinx.coroutines.InternalCoroutinesApi
compiler flag.
suspend fun <T> runInterruptibleCancellable(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) {
try {
val threadState = ThreadState(coroutineContext.job)
threadState.setup()
try {
block()
} finally {
threadState.clearInterrupt()
}
} catch (e: InterruptedException) {
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
}
}
private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3
private class ThreadState(private val job: Job) : CompletionHandler {
/*
=== States ===
WORKING: running normally
FINISH: complete normally
INTERRUPTING: canceled, going to interrupt this thread
INTERRUPTED: this thread is interrupted
=== Possible Transitions ===
+----------------+ register job +-------------------------+
| WORKING | cancellation listener | WORKING |
| (thread, null) | -------------------------> | (thread, cancel handle) |
+----------------+ +-------------------------+
| | |
| cancel cancel | | complete
| | |
V | |
+---------------+ | |
| INTERRUPTING | <--------------------------------------+ |
+---------------+ |
| |
| interrupt |
| |
V V
+---------------+ +-------------------------+
| INTERRUPTED | | FINISHED |
+---------------+ +-------------------------+
*/
private val _state = AtomicInteger(WORKING)
private val targetThread = Thread.currentThread()
// Registered cancellation handler
private var cancelHandle: DisposableHandle? = null
fun setup() {
cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// Either we successfully stored it or it was immediately cancelled
while (true) {
when (val state = _state.get()) {
// Happy-path, move forward
WORKING -> if (_state.compareAndSet(state, WORKING)) return
// Immediately cancelled, just continue
INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
fun clearInterrupt() {
/*
* Do not allow to untriggered interrupt to leak
*/
while (true) {
when (val state = _state.get()) {
WORKING -> if (_state.compareAndSet(state, FINISHED)) {
cancelHandle?.dispose()
return
}
INTERRUPTING -> {
/*
* Spin, cancellation mechanism is interrupting our thread right now
* and we have to wait it and then clear interrupt status
*/
}
INTERRUPTED -> {
// Clear it and bail out
Thread.interrupted()
return
}
else -> invalidState(state)
}
}
}
// Cancellation handler
override fun invoke(cause: Throwable?) {
if (cause is InterruptionException) {
while (true) {
when (val state = _state.get()) {
// Working -> try to transite state and interrupt the thread
WORKING -> {
if (_state.compareAndSet(state, INTERRUPTING)) {
targetThread.interrupt()
_state.set(INTERRUPTED)
return
}
}
// Finished -- runInterruptible is already complete, INTERRUPTING - ignore
FINISHED, INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
}
private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}
class InterruptionException(cause: Throwable?) : CancellationException() {
init {
initCause(cause)
}
}
fun Job.interrupt(cause: Throwable? = null) {
this.cancel(InterruptionException(cause))
}
suspend fun Job.interruptAndJoin() {
interrupt()
return join()
}
You can use the interrupt
and interruptAndJoin
extension functions to trigger thread interruption, otherwise use cancel
for non-interrupting cancel. An example:
val scope = CoroutineScope(Dispatchers.IO)
val job = scope.launch {
runInterruptibleCancellable {
// some blocking code
Thread.sleep(1000)
if (!isActive) {
println("cancelled")
} else {
println("completed")
}
}
}
job.invokeOnCompletion {
if (it is InterruptionException) {
print("interrupted")
}
}
runBlocking {
// job.interruptAndJoin() // prints "interrupted"
// job.cancelAndJoin() // prints "cancelled"
job.join() // prints "completed"
}
This example is the only testing I've made. It seems to work. I don't know if it leaks, I don't know if it's thread-safe. I'm really far out of my expertise to be honest. Please don't use it in production without further confirmation that it works.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With