I tried to write a wrapper for BillingClient v.2.2.0 with Kotlin Coroutines:
package com.cantalk.photopose.billing
import android.app.Activity
import android.content.Context
import com.android.billingclient.api.*
import com.android.billingclient.api.BillingClient.*
import com.cantalk.photopose.util.Logger
import kotlinx.coroutines.CompletableDeferred
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
class BillingClientAsync(context: Context) {
    private val billingClient: BillingClient = setupBillingClient(context)
    private val pendingPurchaseFlows = HashMap<String, CompletableDeferred<Purchase>>()
    private fun setupBillingClient(context: Context): BillingClient {
        return newBuilder(context)
            .enablePendingPurchases()
            .setListener { billingResult, purchases ->
                if (billingResult.responseCode == BillingResponseCode.OK && purchases != null) {
                    for (purchase in purchases) {
                        val deferred = pendingPurchaseFlows.remove(purchase.sku)
                        deferred?.complete(purchase)
                    }
                } else {
                    val iterator = pendingPurchaseFlows.iterator()
                    while (iterator.hasNext()) {
                        val entry = iterator.next()
                        entry.value.completeExceptionally(BillingException(billingResult))
                        iterator.remove()
                    }
                }
            }
            .build()
    }
    suspend fun queryPurchases(): List<Purchase> {
        Logger.debug("query purchases")
        ensureConnected()
        val queryPurchases = billingClient.queryPurchases(SkuType.INAPP)
        if (queryPurchases.responseCode == BillingResponseCode.OK) {
            return queryPurchases.purchasesList
        } else {
            throw BillingException(queryPurchases.billingResult)
        }
    }
    suspend fun querySkuDetails(@SkuType type: String, skus: List<String>): List<SkuDetails> {
        Logger.debug("query sku details for", type)
        ensureConnected()
        return suspendCoroutine { continuation ->
            val params = SkuDetailsParams.newBuilder()
                .setType(type)
                .setSkusList(skus)
                .build()
            billingClient.querySkuDetailsAsync(params) { billingResult, skuDetailsList ->
                if (billingResult.responseCode == BillingResponseCode.OK) {
                    continuation.resume(skuDetailsList)
                } else {
                    continuation.resumeWithException(BillingException(billingResult))
                }
            }
        }
    }
    suspend fun purchase(activity: Activity, skuDetails: SkuDetails): Purchase {
        Logger.debug("purchase", skuDetails.sku)
        ensureConnected()
        val currentPurchaseFlow = CompletableDeferred<Purchase>()
            .also { pendingPurchaseFlows[skuDetails.sku] = it }
        val params = BillingFlowParams.newBuilder()
            .setSkuDetails(skuDetails)
            .build()
        billingClient.launchBillingFlow(activity, params)
        return currentPurchaseFlow.await()
    }
    suspend fun consume(purchase: Purchase): String {
        Logger.debug("consume", purchase.sku)
        ensureConnected()
        return suspendCoroutine { continuation ->
            val params = ConsumeParams.newBuilder()
                .setPurchaseToken(purchase.purchaseToken)
                .setDeveloperPayload("TBD")
                .build()
            billingClient.consumeAsync(params) { billingResult, purchaseToken ->
                if (billingResult.responseCode == BillingResponseCode.OK) {
                    continuation.resume(purchaseToken)
                } else {
                    continuation.resumeWithException(BillingException(billingResult))
                }
            }
        }
    }
    suspend fun acknowledgePurchase(purchase: Purchase) {
        Logger.debug("acknowledge", purchase.sku)
        ensureConnected()
        return suspendCoroutine { continuation ->
            val params = AcknowledgePurchaseParams.newBuilder()
                .setPurchaseToken(purchase.purchaseToken)
                .setDeveloperPayload("TBD")
                .build()
            billingClient.acknowledgePurchase(params) { billingResult ->
                if (billingResult.responseCode == BillingResponseCode.OK) {
                    continuation.resume(Unit)
                } else {
                    continuation.resumeWithException(BillingException(billingResult))
                }
            }
        }
    }
    private suspend fun ensureConnected() {
        if (!billingClient.isReady) {
            startConnection()
        }
    }
    private suspend fun startConnection() {
        Logger.debug("connect to billing service")
        return suspendCoroutine { continuation ->
            billingClient.startConnection(object : BillingClientStateListener {
                override fun onBillingSetupFinished(billingResult: BillingResult) {
                    if (billingResult.responseCode == BillingResponseCode.OK) {
                        continuation.resume(Unit)
                    } else {
                        // TODO: 3 Google Play In-app Billing API version is less than 3
                        continuation.resumeWithException(BillingException(billingResult))
                    }
                }
                override fun onBillingServiceDisconnected() = Unit
            })
        }
    }
}
As you can see, when I try to query purchases or purchase I ensure that client is ready. But in production there are many errors:
java.lang.IllegalStateException: 
  at kotlin.coroutines.SafeContinuation.resumeWith (SafeContinuation.java:2)
  at com.cantalk.photopose.billing.BillingClientAsync$startConnection$2$1.onBillingSetupFinished (BillingClientAsync.java:2)
  at com.android.billingclient.api.zzai.run (zzai.java:6)
I tried to understand what the cause of problem and got that if BillingClientStateListener.onBillingSetupFinished will be called multiple time there can be an exception IllegalStateException: Already resumed. I've wondered how is it possible, because I am creating new listener every startConnection call? I can't reproduce this issue on emulator or my test device. Can anybody explain me what does happen here and how to fix it?
↳ com.android.billingclient.api.BillingClient. Main interface for communication between the library and user application code. It provides convenience methods for in-app billing. You can create one instance of this class for your application and use it to process in-app billing operations.
Note that onBillingServiceDisconnected will be called when there's a connection, but it gets lost. You can test it by clearing Google Play's data while your app is open. If you don't retry at this point the connection will be lost.
It provides synchronous (blocking) and asynchronous (non-blocking) methods for many common in-app billing operations. It's strongly recommended that you instantiate only one BillingClient instance at one time to avoid multiple PurchasesUpdatedListener.onPurchasesUpdated (BillingResult, List) callbacks for a single event.
Initiates the billing flow for an in-app purchase or subscription. Initiates a flow to confirm the change of price for an item subscribed by the user. Constructs a new BillingClient.Builder instance. Returns the most recent purchase made by the user for each SKU, even if that purchase is expired, canceled, or consumed. This method is deprecated.
To perform setup, call the startConnection (BillingClientStateListener) method and provide a listener; that listener will be notified when setup is complete, after which (and not before) you may start calling other methods. After setup is complete, you will typically want to request an inventory of owned items and subscriptions.
It's strongly recommended that you instantiate only one BillingClient instance at one time to avoid multiple PurchasesUpdatedListener.onPurchasesUpdated (BillingResult, List) callbacks for a single event. All methods annotated with AnyThread can be called from any thread and all the asynchronous callbacks will be returned on the same thread.
I tried to do the same at first, but the rationale is not correct. onBillingSetupFinished() might be called more than once, by design. Once you call BillingClient.startConnection(BillingClientStateListener) with the callback, it stores the callback internally and calls it again if connection is dropped/regained. You shouldn't pass in a new object on other calls to BillingClient.startConnection(BillingClientStateListener).
Read the documentation on onBillingServiceDisconnected():
Called to notify that the connection to the billing service was lost.
Note: This does not remove the billing service connection itself - this binding to the service will remain active, and you will receive a call to onBillingSetupFinished(BillingResult) when the billing service is next running and setup is complete.
This means that when the connection is dropped and then later regained, onBillingSetupFinished(BillingResult) will be called again, and, in your implementation, you will try to resume the coroutine again, but the coroutine continuation has already been resumed and you will get an IllegalStateException.
What I ended up doing is implementing the BillingClientStateListener interface in the class itself, and on the callbacks I update a SharedFlow<Int> with the BillingResult from onBillingSetupFinished(BillingResult)
private val billingClientStatus = MutableSharedFlow<Int>(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override fun onBillingSetupFinished(result: BillingResult) {
    billingClientStatus.tryEmit(result.responseCode)
}
override fun onBillingServiceDisconnected() {
    billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
}
Then, you can collect the flow to fetch your SKU prices or handle pending purchases if billing client is connected, or implement a retry logic if it isn't:
init {
    billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
    lifecycleOwner.lifecycleScope.launchWhenStarted {
        billingClientStatus.collect {
            when (it) {
                BillingClient.BillingResponseCode.OK -> with (billingClient) {
                    updateSkuPrices()
                    handlePurchases()
                }
                else -> billingClient.startConnection(this)
            }
        }
    }
}
And if you are doing some operation that requires billing client connection, you can wait for it by doing something like:
private suspend fun requireBillingClientSetup(): Boolean =
    withTimeoutOrNull(TIMEOUT_MILLIS) {
        billingClientStatus.first { it == BillingClient.BillingResponseCode.OK }
        true
    } ?: false
(Note that I used SharedFlow<T> and not StateFlow<T> for billingClientStatus: the reason is StateFlow<T> does not support emitting consecutive equal values).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With