From Reactor to Coroutines

From Reactor to Coroutines

Last September, I wrote how to migrate from an Imperative Programming codebase to a Reactive Programming one in a step-by-step process. Because of its popularity, I illustrated the post with a Spring Boot demo. The final result uses Mono and Flux from Project Reactor. I also made sure that no step in the processing pipeline is blocking thanks to Blockhound.

I wrote the code in Java so that it could be accessible to the largest audience. But I'm still a Kotlin fan at heart. Hence, I ported the codebase to Kotlin.

But Kotlin brings more benefits to the table. In the context of reactive programming, it offers coroutines. I wanted to showcase them, but I found it harder than expected migrating from Project Reactor to coroutines. This is what I want to explain in this post.

The starting point

The following code is our starting point:

class Person                                                         // 1

interface PersonRepository : ReactiveSortingRepository<Person, Long> // 2

@Configuration
class PersonRoutes {

  @Bean
  fun router(repository: PersonRepository) = router {
    val handler = PersonHandler(repository)
    GET("/person", handler::getAll)
  }
}

class PersonHandler(private val repository: PersonRepository) {
  fun getAll(req: ServerRequest): Mono<ServerResponse> {
    val flux = repository.findAll(Sort.by("lastName", "firstName"))  // 3
    return ok().body<Person>(flux)                                   // 4
  }
}
  1. The entity. The exact properties are not relevant to this post
  2. Spring Data R2DBC repository. It offers generic functions that return reactive types e.g. Mono<Person> and Flux<Person>
  3. At runtime, Spring provides an implementation
  4. Wraps the Flux<Person> into a Mono<ServerResponse

Coroutines and reactive types are not compatible. At compile-time, the return type of suspended functions is changed from T to Defered<T>. Hence, migrating to coroutines requires to change the signature of functions in two ways:

  1. Add the suspend keyword
  2. Change the return type from a reactive type Flux<T> (or Mono<T>) to a standard type List<T> (or T)

Migrating the repository

The Coroutines Reactor module allows bridging from deferred types to reactive types with the Deferred.asMono() extension function. But I found no way to bridge the other way around, from Reactor to coroutines. Because the repository returns reactive types, the whole processing chain needs to use them. The issue becomes how to make the repository coroutine-compatible. For that, Spring Data offers the CoroutineSortingRepository<T, ID> interface.

We need another dependency:

<dependency>
  <groupId>org.jetbrains.kotlinx</groupId>
  <artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>

The code becomes:

interface PersonRepository : CoroutineCrudRepository<Person, Long>

Migrating the handler

This change makes all functions from the repository suspending. We need to propagate this upwards to the handler:

class PersonHandler(private val repository: PersonRepository) {
  suspend fun getAll(req: ServerRequest): ServerResponse {          // 1-2
    val flow = repository.findAll(Sort.by("lastName", "firstName"))
    return ok().bodyAndAwait(flow)                                  // 3
  }
}
  1. Add the suspend keyword
  2. Return ServerResponse instead of Mono<ServerResponse.
  3. Replace the body() function by the bodyAndAwait() extension function. This is necessary because all standard functions return reactive types.

Migrating the routes

Finally, we need to update the routes accordingly:

@Configuration
class PersonRoutes {

  @Bean
  fun router(repository: PersonRepository) = coRouter {  // 1
    val handler = PersonHandler(repository)
    GET("/person", handler::getAll)
  }
}
  1. Change from router to coRouter

Bonus: migrating JDK types

I simplified the starting point above. The original code introduces caching between the repository and the handler via Hazelcast.

class CachingService(
  private val cache: IMap<Long, Person>,
  private val repository: PersonRepository
) {
  fun findById(id: Long) = Mono.fromCompletionStage { cache.getAsync(id) } // 1-2
    .switchIfEmpty(                                                        // 3
      repository.findById(id)
        .doOnNext { cache.putAsync(it.id, it) }                            // 4
    )
}
  1. Hazelcast offers an asynchronous API that returns a CompletionStage
  2. Mono allows to bridge from CompletionStage to Mono
  3. If the cache doesn't contain the entity, switch to another branch
  4. When found, put the entity in the cache for later use

To bridge from CompletionStage<T> to Deferred<T>, we need one more dependency:

<dependency>
  <groupId>org.jetbrains.kotlinx</groupId>
  <artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>

With it comes the await() extension function. The code becomes:

class CachingService(
  private val cache: IMap<Long, Person>,
  private val repository: PersonRepository
) {
  suspend fun findById(id: Long) = cache.getAsync(id).await()    // 1
    ?: repository.findById(id)?.also { cache.putAsync(it.id, it)
}
  1. Transform the CompletionStage<Person> type to the Deferred<Person> type

As an added benefit, we can discard the specific Reactive Programming model to a more traditional Imperative one while keeping the code reactive.

Conclusion

Reactor types and coroutines aim to achieve the same goal but use different paths for that. For that reason, their types are not compatible. Migrating to suspending functions is a bottom-up process. For that, Spring Data R2DBC offers the CoroutineCrudRepository. Once done, the rest is pretty straightforward.

The complete source code for this post can be found on Github.

Go further:

Originally published at A Java Geek on November 1st 2020