Coroutines and Flow in Android apps

David Geček

When working on an Android app, we usually want to isolate ugly, Android-specific stuff. Clean architecture helps us here. Application-specific business rules are at the center of the project, not having anything to do with the used technology. Everything else is an implementation detail.

As this onion architecture has a lot of layers, it’s important to make it easy for the data to flow throughout the layers. For this, Android developers usually use Rx — a really powerful tool for reactive programming. Unfortunately, it’s often used only to switch something to the background thread. Operations that were previously started with Singles and Completables (one-time actions where Rx is used only for easy thread switching) can indeed be replaced with coroutines. When it comes to Rx Observables, or Flowables, (true reactive programming, where data is observed for some period of time), Jetbrains came with the alternative — Flow.

There were a lot of discussions among the Android developers about “coroutines vs Rx”, which is wrong — they are made for different usages. We distinguish between one-time actions when we use coroutines and observing data when we use Flow.

One-time actions

Kotlin provides coroutine support at the language level. It’s a really easy way to multitask, suspend and resume tasks.

Scopes make it easy to avoid shooting yourself in the foot. When starting a coroutine, start it in scope, and all the tasks will be canceled when canceling the scope. Android Architecture Components define ViewModelScope and LifecycleScope, but you can also make your own scope. ViewModelScope will be canceled when ViewModel is cleared, and LifecycleScope is tied to the lifecycle owner, like Fragment:

viewModelScope.launch {

   val user = getUserUseCase.execute(userId)

}

ViewModelScope.launch will be run on the main thread by default, so don’t forget to switch long-running tasks to different dispatchers in the repository or use case. These functions should be marked with a suspend keyword. Every suspend function can change the dispatcher with withContext(dispatcher):

override suspend fun execute(userId: String): User? =

   withContext(Dispatchers.Default) {

       // calculation

   }

Suspending functions are very powerful. Function execute will be run on Default dispatcher. Assigning the result to the user value will be done after the calculation, but the main thread (on which getting the user is started) will not be blocked but rather suspended. The main thread will be available for other stuff until the user is returned.

Functionality withContext() can also be used just inside the viewModelScope.launch, but the view model shouldn’t know about the implementation of fetching the user. It’s better than suspend functions in the domain and data layer to decide how and on which dispatcher the value is fetched or calculated. This way every suspend function takes care of the dispatcher for its own code.

Dispatchers.IO is usually used for fetching the data from networks, databases, etc. — operations that are not CPU intensive. It is backed by a shared pool of 64 threads.

Dispatchers.Default should be used for calculations or CPU-intensive tasks. It’s limited by the shared pool of threads and the size of the CPU cores count.

You can also return to Dispatchers.Main (Android main thread), or use Dispatchers.Unconfined which doesn’t change the thread. It starts on the current thread, but if this function starts another suspend function which changes the Dispatcher, it will just resume on that Dispatcher.

If you need multiple results from different sources, you can start multiple coroutines with async:

value1Deferred = viewModelScope.async {

   // …

}

value2Deferred = viewModelScope.async {

   // …

}

val user = User(value1Deferred.await(), value2Deferred.await())

This will start 2 coroutines, one for each value fetch. If coroutines are switched each to its thread (and not left on Dispatchers.Main), the creation of the user object will take as much time as it’s needed for the longest value fetch operation. E.g., if value1 fetch takes 3 seconds, and value2 fetch takes 5 seconds, the User will be created after about 5 seconds. If both operations would have been started in one coroutine, it would take 8 seconds!

Every coroutine can “split” itself this way, without the coroutine starter’s knowledge.

Both launch and async return Job with which you can check if it’s active, completed or canceled. You can use this if you don’t want to start some coroutine with some other coroutine active.

Observing

Coroutines are nice for one-time actions when you need to fetch or calculate something only once, and that’s it! What about observing?

Although Kotlin Flow lacks some of the powerful set of Rx operators, it’s harder to leave something unmanaged. Let’s take a look at the example:

getUserUseCase.execute()

   .debounce(DEBOUNCE_TIME)

   .collectLatest { user->

       doSomethingWithUser(user)

   }

Function execute() will return Flow<User>. We can apply multiple operators on top of that, like debounce, which will filter out values that are followed by newer values in said time. Most of the lacking operators can easily be written as Flow extension functions and can be found as open-source functions on GitHub or blogs.

After putting all the operators we want, we can subscribe to events with collect() or collectLatest(). The latter will cancel the previous value block when the new value comes. Now, if you put this piece of code just in the view model or presenter method, it will not compile. The reason is that collect() is a suspend function and it will force you to start the Flow tied to the CoroutineScope. This way, you will stay subscribed just as long as the scope is alive — e.g., just as long as the view model is alive. No babysitting:

viewModelScope.launch {

   getUserUseCase.execute()

       .debounce(DEBOUNCE_TIME)

       .collectLatest { user ->

           doSomethingWithUser(user)

       }

   }

Just like with coroutines, viewModelScope.launch() will launch Flow on the main dispatcher, and just like with coroutines, let’s switch it to wanted Dispatcher in the domain or data layer:

buildFlow()

   .map { mapToSomething(it) }

   .flowOn(Dispatchers.Default)

FlowOn will affect all the preceding operators until another .flowOn, so this function can again define the dispatcher for its operators, and buildFlow() can have its own .flowOn for its set of operators. If we put it in one method, it would work like this:

.filter { … }               // Will be executed in IO

.flowOn(Dispatchers.IO)

.map { mapToSomething(it) } // Will be executed in Default

.flowOn(Dispatchers.Default)

So where do Flows originate from? How to “make” them?

Well, there are few ways.

You can use flow builders:

flowOf(1, 2, 3)

You can also use channels, which are similar to Subjects, or Processors, in RxJava:

private val usernameChannel = ConflatedBroadcastChannel<String>()

usernameChannel

   .asFlow()

   .collect{}

Some libraries also allow you to observe data via Flow — like Room, where you can write something like this in Dao interface:

@Query("SELECT * FROM user")

fun getUsers(): Flow<List<User>>

Conclusion

In conclusion, coroutines and Flow seem like very powerful, yet easy-to-use tools. If you are coming from Rx world, you’ll need some time to adapt to a little bit different way of thinking. If you are unfamiliar with both Rx and coroutines/Flow, the latter seems a lot easier to grasp.

If you already have Rx in your project, you can start working with coroutines and Flow on the new screens. You can also transform existing Rx use cases to Flow use cases with Publisher<T>.asFlow() extension function available in kotlinx.coroutines.reactive. Use cases can now return Flow without touching the bottom Rx layers.

Always on a lookout for new talent @ Njuškalo

Apply Today