1. Spring Webflux Overview

Spring Framework 5 includes a new webflux module for supporting reactive applications. If we want systems that are Responsive, Resilient, Elastic and Message Driven then we need “Reactive Systems” and spring webflux was what you were looking for.

It was created to solve the problem of the need for a non-blocking web stack to handle concurrency with a small number of threads. The term “reactive” refers instead to programming models that are built around reacting to change. In this direction non ­blocking is reactive because instead of being blocked we are now in the mode of reacting to notifications as operations complete or data becomes available.

Spring webflux is fully non-blocking, and supports reactive streams back pressure, and runs on servers such as Netty, Undertow, and Servlet 3.1+ containers.

Spring webflux reactive web application stack overview
Reactive web application stack overview

2. About non-blocking

Usually, when a client makes a request for a servlet, I/O on the server is managed by a specific thread taken from a limited pool. If the operation is time extensive or has some Latency, for example, because we are calling another service, the I/O results in a blocked state until we get a response. This could not be a problem in a system with a small load, but can Limit performance on a high scaling one. One solution could be to rise up thread pool size, but this cannot be a good idea, due to increased resources consumption. In Spring WebFLux, and non-bLocking servers in general, it is assumed that applications will not block, and therefore non-blocking servers use a small, fixed- size thread pool (event loop workers) to handle requests.

3. Data types

Spring uses behind the scenes Reactor (https://projectreactor.io/) as Library to structure their new APIs.

Flux and Mono are two main concepts involved in reactive programming. Both are implementation of Publisher interface, but Flux produces 0 to N items, wile Mono produces 0 to 1 item.

4. The sample system

For testing purpose supposes to create a service that continuously monitors the temperature of an object (in Celsius), in an IOT style manner

4.1. Preparations

We’ll set up the following dependency:


<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

For Latest version you can see here

4.2. Server part

Now, Let’s start to create the controller responsible for generating temperatures:

import java.time.Duration;
import java.util.Random;
import java.util.stream.Stream;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class TemperatureController {

    Logger logger = LoggerFactory.getLogger(TemperatureController.class);

    @GetMapping(value = "/temperatures", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Integer> getTemperature() {
        Random r = new Random();
        int low = 0;
        int high = 50;
        return Flux.fromStream(Stream.generate(() -> r.nextInt(high - low) + low)
            .map(s -> String.valueOf(s))
            .peek((msg) -> {
                logger.info(msg);
            }))
            .map(s -> Integer.valueOf(s))
            .delayElements(Duration.ofSeconds(1));

    }
}

Server part consists of a RestController that has a route responding to “/temperatures” path using GET HTTP method. Flux is the reactive component that we use to send multiple objects to the client. We can create in many different ways, but for this simple scenario we used the generation starting from a java8 stream.

The delayElements method is used to insert a delay for every item sent to the client.

4.3. Client part

Now Let’s write down a simple spring boot client application that connects to the server and retrieves temperatures from that.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;

@SpringBootApplication(scanBasePackages = { "com.mgiglione" })
public class TemperatureClient {

    Logger logger = LoggerFactory.getLogger(TemperatureClient.class);

    @Bean
    WebClient getWebClient() {
        return WebClient.create("http://localhost:8081");
    }

    @Bean
    CommandLineRunner demo(WebClient client) {
        return args -> {
            client.get()
                .uri("/temperatures")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(Integer.class)
                .map(s -> String.valueOf(s))
                .subscribe(msg -> {
                    logger.info(msg);
                });
        };
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(TemperatureClient.class).properties(java.util.Collections.singletonMap("server.port", "8081"))
            .run(args);
    }


}

The client makes use of the WebClient class to manage connection with the server part. Once specified url and media type accepted by the client, we can retrieve the flux and subscribe to each event sent on it, in this case printing each element on the console.

5. Conclusions

In this article we have seen how to produce and consume a stream of data in a reactive way using the new Spring Webflux module. All of the code snippets, mentioned in the article, can be found in my Github repository.
If you were looking for a solution on how to write unit and integration tests using spring boot 2 you can read this my other post.

Leave a Reply

Your email address will not be published. Required fields are marked *