Reactive Programming in Kotlin: A Step-by-Step Guide

In this article, we are going to tackle Reactive Programming in Kotlin. We will start with what exactly Reactive Programming is, how we could use it to our advantage, and what techniques we can use in Kotlin. I am going to explain everything in simple terms to make this article beginner-friendly (considering you know the basics of Kotlin, of course).

In some future articles, we will put that knowledge into action on the example of the Ktor project, emphasizing the Reactive approach. So don’t forget to follow me on LinkedIn to not miss out

Reactive Programming

If you try to search for what Reactive Programming is, you will sooner or later find The Reactive Manifesto, which is cited by almost everyone.

And in most articles, there will be something like the following diagram:

For the beginner, all of these definitions and explanations are vague and unclear. It is absolutely possible to learn it and speak about it, for example, in an interview with zero understanding whatsoever. Instead, let’s connect it to concepts that are present in software development, and don’t worry if you are not familiar with them.

Let’s take a look at another diagram that represents the concepts:

The key concepts in the Observer Pattern are “subjects” and “observers“. When a subject’s state changes, it notifies all its observers, allowing them to react accordingly.

The Iterator Pattern introduces a way to access elements of some object, most likely a collection, sequentially without exposing its internal implementation.

And as a cherry on top, Reactive Programming is tight with Functional Programming so, in essence, let’s define what Reactive Programming is in these terms. Reactive Programming leverages the Observer’s one-to-many notification and the Iterator’s sequential access, but extends them to handle continuous data flows and asynchronous interactions without defining “what” to do with data but “how” to iterate.

Streams

All mentioned before leads us to another core concept: Streams. If you are familiar with the Flows, it is basically them, if not we’ll cover them later anyway. So, a Stream is a sequence of data objects that can be consumed and processed asynchronously. What is more, you can also merge, filter, combine, and transform them to handle data successfully.

Now let’s check what Kotlin can offer in terms of the Reactive Approach. We will start with asynchronicity, and the best solution for this is Coroutines and especially Flows. But before that, let’s discuss types of streams.

“Cold” and “Hot” Streams

In terms of emitting data, we have two types of streams. On the one hand, “cold” streams are streams that can only emit data when there is someone who could consume it, in other words, if there is any observer attached. This basically means that no data would be missed and the whole chain of the items consumed.

On the other hand, we have “hot” streams. They are not dependent on any observable attached and start emitting items as soon as they are created. And here is the key difference, you could “miss” values using “hot” streams if they are not handled carefully.

Kotlin Coroutines

Quick dive into Coroutines basics, and then we will, Coroutines allow us to create an asynchronous code execution just like threads, but they are not bound to any particular thread and without callbacks.

Now for the real part, the Reactive Programming Coroutines library can offer 3 options:

Channels

StateFlow

SharedFlow

Both StateFlow and SharedFlow are evolutions of the Flow which is generally a Stream I’ve mentioned before. Flow represents a sequence of values that can be computed asynchronously. But firstly, let’s start from Channels.

Channels

The first notable object is Chanel. As documentation states, Chanel provides communication between coroutines. But in terms of our topic, Chanel is a “hot” Stream in which each individual value could consume only one observer.

Nevertheless, it can have more the one observer, but the values would be delivered to the first awaited. If there are many observers that wait for the value to be delivered, some collectors will get suspended. Now let’s check a simple code example:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun fetchStockPrice(): Double {
delay(1000) // Simulate API call delay
return Math.random() * 100
}

fun main() = runBlocking {
// Create a Chanel to hold stock prices
val stockPrices = Channel<Double>()

// Launch a coroutine to fetch prices every second
launch {
while (isActive) {
val price = fetchStockPrice()
stockPrices.send(price)
delay(1000)
}
}

// Launch a coroutine to consume and display prices
launch {
for (price in stockPrices) {
println(«Current Stock Price: $$price»)
}
}

delay(Long.MAX_VALUE)
}

This example simulates a stock price ticker application that fetches prices from an API in real time and displays them to the user.

We used Channel to handle the asynchronous data flow and transfer data from one Coroutine to another!

SharedFlow

The SharedFlow is a Stream in all of its glory.

It is a hot stream, that can have multiple numbers of observers. We can check it out with the following code example:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

data class Item(val name: String, val price: Double)

fun main() = runBlocking {
// SharedFlow to hold the cart items
val cartItems = MutableSharedFlow<List<Item>>()

// Launch a coroutine to print the cart on SharedFlow
launch {
cartItems.collect { items ->
println(«Cart Updated: $items»)
}
}

// Launch another coroutine to trigger order confirmation when the cart is full
launch {
cartItems.filter { it.size >= 3 } // Filter for 3 or more items
.onEach { println(«Order Confirmation triggered!») }
.launchIn(this)
}

// Launch a coroutine to simulate adding items to the cart
launch {
var updatedItems = listOf(Item(«Apple», 1.50))
cartItems.emit(updatedItems)
delay(1000)
updatedItems = updatedItems + Item(«Milk», 2.00)
cartItems.emit(updatedItems)
delay(1000)
updatedItems = updatedItems + Item(«Bread», 3.00)
cartItems.emit(updatedItems)
}

delay(Long.MAX_VALUE)
}

In the example above, we created a shopping cart application where multiple components need to react to changes in the cart items.

SharedFlow provides a centralized way to share updates efficiently. And we can witness multiple observers at the same time.

StateFlow

The StateFlow is similar to SharedFlow. However, it is a somewhat cold stream because it requires some initial value on creation, and it always holds a value.

This basically means that it always has a value to observe. Here’s a good example:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

enum class PlayerState {
Playing,
Paused,
Stopped
}

fun main() = runBlocking {
// StateFlow to hold the current player state
val playerState = MutableStateFlow(PlayerState.Stopped)

// Launch a coroutine to print state based on StateFlow
launch {
playerState.collect { state ->
println(«Player State: $state»)
}
}

// Launch a coroutine to simulate user actions
launch {
// Play/pause/stop actions update the state
playerState.emit(PlayerState.Playing)
delay(2000)
playerState.emit(PlayerState.Paused)
delay(1000)
playerState.emit(PlayerState.Stopped)
}

delay(Long.MAX_VALUE)
}

For this example, we use a music player application where the current playing state needs to be tracked. StateFlow provides a single source of truth for this state, with an initial value.

Other Reactive options available

The most popular alternatives are ReactiveX libraries. The main competitor is RxJava for Java basically and it’s widely used especially in Android Development. Unfortunately, RxKotlin has almost zero popularity these days due to its extensive RxJava presence.

RxJava provides extensive tools for Reactive programming, but due to its limited integration with Kotlin Coroutines, I recommend using Coroutines but keep in mind that this powerful tool is also available.

There are also lots of libraries tightly connected to Spring Boot. There are Spring WebFlux, and Project Reactor. They are provided tools in applications for building reactive systems in JVM Backend Development, but not that popular in Kotlin.

Summary

And that’s all for this article about Reactive Programming in Kotlin with Coroutines and Flows.

In the upcoming articles, we will get back to this topic and learn how to apply this knowledge with Ktor, so don’t forget to join the free newsletter to not miss it!

Lastly, if you would like to learn more about Flows, then you can find lots of useful information on the official documentation of Kotlin.

The post Reactive Programming in Kotlin: A Step-by-Step Guide appeared first on Codersee | Kotlin, Ktor, Spring.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *