Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to enqueue sequential coroutines blocks

What I'm trying to do

I have an app that's using Room with Coroutines to save search queries in the database. It's also possible to add search suggestions and later on I retrieve this data to show them on a list. I've also made it possible to "pin" some of those suggestions.

My data structure is something like this:

@Entity(
        tableName = "SEARCH_HISTORY",
        indices = [Index(value = ["text"], unique = true)]
)
data class Suggestion(
    @PrimaryKey(autoGenerate = true)
    @ColumnInfo(name = "suggestion_id")
    val suggestionId: Long = 0L,
    val text: String,
    val type: SuggestionType,
    @ColumnInfo(name = "insert_date")
    val insertDate: Calendar
)

enum class SuggestionType(val value: Int) {
    PINNED(0), HISTORY(1), SUGGESTION(2)
}

I have made the "text" field unique to avoid repeated suggestions with different states/types. E.g.: A suggestion that's a pinned item and a previously queried text.

My Coroutine setup looks like this:

private val parentJob: Job = Job()

private val IO: CoroutineContext
    get() = parentJob + Dispatchers.IO

private val MAIN: CoroutineContext
    get() = parentJob + Dispatchers.Main

private val COMPUTATION: CoroutineContext
    get() = parentJob + Dispatchers.Default

And my DAOs are basically like this:

@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(obj: Suggestion): Long

@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(objList: List<Suggestion>): List<Long>

I also have the following public functions to insert the data into the database:

fun saveQueryToDb(query: String, insertDate: Calendar) {
    if (query.isBlank()) {
        return
    }
    val suggestion = Suggestion(
            text = query,
            insertDate = insertDate,
            type = SuggestionType.HISTORY
    )
    CoroutineScope(IO).launch {
        suggestionDAO.insert(suggestion)
    }
}

fun addPin(pin: String) {
    if (pin.isBlank()) {
        return
    }
    val suggestion = Suggestion(
            text = pin,
            insertDate = Calendar.getInstance(),
            type = SuggestionType.PINNED
    )
    CoroutineScope(IO).launch {
        suggestionDAO.insert(suggestion)
    }
}

fun addSuggestions(suggestions: List<String>) {
    addItems(suggestions, SuggestionType.SUGGESTION)
}

private fun addItems(items: List<String>, suggestionType: SuggestionType) {
    if (items.isEmpty()) {
        return
    }

    CoroutineScope(COMPUTATION).launch {
        val insertDate = Calendar.getInstance()
        val filteredList = items.filterNot { it.isBlank() }
        val suggestionList = filteredList.map { History(text = it, insertDate = insertDate, suggestionType = suggestionType) }
        withContext(IO) {
            suggestionDAO.insert(suggestionList)
        }
    }
}

There are also some other methods, but let's focus on the ones above.

EDIT: All of the methods above are part of a lib that I made, they're are not made suspend because I don't want to force a particular type of programming to the user, like forcing to use Rx or Coroutines when using the lib.

The problem

Let's say I try to add a list of suggestions using the addSuggestions() method stated above, and that I also try to add a pinned suggestion using the addPin() method. The pinned text is also present in the suggestion list.

val list = getSuggestions() // Getting a list somewhere
addSuggestions(list)
addPin(list.first())

When I try to do this, sometimes the pin is added first and then it's overwritten by the suggestion present in the list, which makes me think I might've been dealing with some sort of race condition. Since the addSuggestions() method has more data to handle, and both methods will run in parallel, I believe the addPin() method is completing first.

Now, my Coroutines knowledge is pretty limited and I'd like to know if there's a way to enqueue those method calls and make sure they'll execute in the exact same order I invoked them, that must be strongly guaranteed to avoid overriding data and getting funky results later on. How can I achieve such behavior?

like image 457
Mauker Avatar asked Sep 05 '25 00:09

Mauker


2 Answers

I'd follow the Go language slogan "Don't communicate by sharing memory; share memory by communicating", that means instead of maintaining atomic variables or jobs and trying to synchronize between them, model your operations as messages and use Coroutines actors to handle them.

sealed class Message {
   data AddSuggestions(val suggestions: List<String>) : Message()
   data AddPin(val pin: String) : Message()
}

And in your class


private val parentScope = CoroutineScope(Job())

private val actor = parentScope.actor<Message>(Dispatchers.IO) {
        for (msg in channel) {
            when (msg) {
                is Message.AddSuggestions -> TODO("Map to the Suggestion and do suggestionDAO.insert(suggestions)")
                is Message.AddPin -> TODO("Map to the Pin and do suggestionDAO.insert(pin)")
            }
        }
    }

fun addSuggestions(suggestions: List<String>) {
    actor.offer(Message.AddSuggestions(suggestions))
}

fun addPin(pin: String) {
    actor.offer(Message.AddPin(pin))
}

By using actors you'll be able to queue messages and they will be processed in FIFO order.

like image 149
Ahmed I. Khalil Avatar answered Sep 07 '25 21:09

Ahmed I. Khalil


By default when you call .launch{}, it launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a Job. The coroutine is canceled when the resulting job is canceled.

It doesn't care or wait for other parts of your code it just runs.

But you can pass a parameter to basically tell it to run immediately or wait for other Coroutine to finish(LAZY).

For Example:

       val work_1 = CoroutineScope(IO).launch( start = CoroutineStart.LAZY ){
       
              //do dome work
        }

       val work_2 = CoroutineScope(IO).launch( start = CoroutineStart.LAZY ){
            //do dome work
            work_1.join()
        }

        val work_3 = CoroutineScope(IO).launch(  ) {
            //do dome work
            work_2.join()
        }

When you execute the above code first work_3 will finish and invoke work_2 when inturn invoke Work_1 and so on,

The summary of coroutine start options is:

DEFAULT -- immediately schedules coroutine for execution according to its context

LAZY -- starts coroutine lazily, only when it is needed

ATOMIC -- atomically (in a non-cancellable way) schedules coroutine for execution according to its context

UNDISPATCHED -- immediately executes coroutine until its first suspension point in the current thread.

So by default when you call .launch{} start = CoroutineStart.DEFAULT is passed because it is default parameter.

like image 26
ObinasBaba Avatar answered Sep 07 '25 20:09

ObinasBaba