Monday, January 20, 2020

Spring WebClient and Java date-time fields

WebClient is Spring Framework's reactive client for making service to service calls.

WebClient has become a go to utility for me, however I unexpectedly encountered an issue recently in the way it handles Java 8 time fields that tripped me up and this post goes into the details.


Happy Path

First the happy path. When using a WebClient, Spring Boot advices a "WebClient.Builder" to be injected into a class instead of the "WebClient" itself and a WebClient.Builder is already auto-configured and available for injection.


Consider a fictitious "City" domain and a client to create a "City". "City" has a simple structure, note that the creationDate is a Java8 "Instant" type:

import java.time.Instant

data class City(
    val id: Long,
    val name: String,
    val country: String,
    val pop: Long,
    val creationDate: Instant = Instant.now()
)

The client to create an instance of this type looks like this:

class CitiesClient(
    private val webClientBuilder: WebClient.Builder,
    private val citiesBaseUrl: String
) {
    fun createCity(city: City): Mono<City> {
        val uri: URI = UriComponentsBuilder
            .fromUriString(citiesBaseUrl)
            .path("/cities")
            .build()
            .encode()
            .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .bodyValue(city)
            .exchange()
            .flatMap { clientResponse ->
                clientResponse.bodyToMono(City::class.java)
            }
    }
}

See how the intent is expressed in a fluent way. The uri and the headers are first being set, the request body is then put in place and the response is unmarshalled back to a "City" response type.


All well and good. Now how does a test look like.

I am using the excellent Wiremock to bring up a dummy remote service and using this CitiesClient to send the request, along these lines:

@SpringBootTest 
@AutoConfigureJson
class WebClientConfigurationTest {

    @Autowired
    private lateinit var webClientBuilder: WebClient.Builder

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @Test
    fun testAPost() {
        val dateAsString = "1985-02-01T10:10:10Z"

        val city = City(
            id = 1L, name = "some city",
            country = "some country",
            pop = 1000L,
            creationDate = Instant.parse(dateAsString)
        )
        WIREMOCK_SERVER.stubFor(
            post(urlMatching("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .withHeader("Content-Type", equalTo("application/json"))
                .willReturn(
                    aResponse()
                        .withHeader("Content-Type", "application/json")
                        .withStatus(HttpStatus.CREATED.value())
                        .withBody(objectMapper.writeValueAsString(city))
                )
        )

        val citiesClient = CitiesClient(webClientBuilder, "http://localhost:${WIREMOCK_SERVER.port()}")

        val citiesMono: Mono<City> = citiesClient.createCity(city)

        StepVerifier
            .create(citiesMono)
            .expectNext(city)
            .expectComplete()
            .verify()


        //Ensure that date field is in ISO-8601 format..
        WIREMOCK_SERVER.verify(
            postRequestedFor(urlPathMatching("/cities"))
                .withRequestBody(matchingJsonPath("$.creationDate", equalTo(dateAsString)))
        )
    }

    companion object {
        private val WIREMOCK_SERVER =
            WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().notifier(ConsoleNotifier(true)))

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

In the highlighted lines, I want to make sure that the remote service receives the date in ISO-8601 format as "1985-02-01T10:10:10Z". In this instance everything works cleanly and the test passes.

Not so happy path

Consider now a case where I have customized the WebClient.Builder in some form. An example is say I am using a registry service and I want to look up a remote service via this registry and then make a call then the WebClient has to be customized to add a "@LoadBalanced" annotation on it - some details here

So say, I have customized WebClient.Builder this way:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().filter { req, next ->
            LOGGER.error("Custom filter invoked..")
            next.exchange(req)
        }
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

It looks straightforward, however now the previous test fails. Specifically the date format of the creationDate over the wire is not ISO-8601 anymore, the raw request looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": 476100610.000000000
}

vs for a working request:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

See how the date format is different.

Problem

The underlying reason for this issue is simple, Spring Boot adds a bunch of configuration on WebClient.Builder that is lost when I have explicitly created the bean myself. Specifically in this instance there is a Jackson ObjectMapper created under the covers which by default writes dates as timestamps - some details here.

Solution

Okay, so how do we get back the customizations that Spring Boot makes. I have essentially replicated the behavior of a auto-configuration in Spring called "WebClientAutoConfiguration" and it looks like this:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(customizerProvider: ObjectProvider<WebClientCustomizer>): WebClient.Builder {
        val webClientBuilder: WebClient.Builder = WebClient
            .builder()
            .filter { req, next ->
                LOGGER.error("Custom filter invoked..")
                next.exchange(req)
            }

        customizerProvider.orderedStream()
            .forEach { customizer -> customizer.customize(webClientBuilder) }

        return webClientBuilder;
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

There is a likely a better approach than just replicating this behavior, but this approach works for me.

The posted content now looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

with the date back in the right format.

Conclusion

Spring Boot's auto-configurations for WebClient provides a opinionated set of defaults. If for any reason the WebClient and it's builder need to be configured explicitly then be wary of some of the customizations that Spring Boot adds and replicate it for the customized bean. In my case, the Jackson customization for Java 8 dates was missing in my custom "WebClient.Builder" and had to be explicitly accounted for.

A sample test and a customization is available here

Thursday, December 26, 2019

Project reactor - de-structuring a Tuple

Tuples are simple data structures that hold a fixed set of items, each of a different data type.

Project Reactor provides a Tuple data structure that can hold about 8 different types.

Tuples are very useful, however one of the issues in using a Tuple is that it is difficult to make out what they hold without de-structuring them at every place they get used.

Problem

Consider a simple operator, which zips together a String and an integer this way:

Mono<Tuple2<String,  Integer>> tup = Mono.zip(Mono.just("a"), Mono.just(2))

The output is a Tuple holding 2 elements.

Now, the problem that I have is this. I prefer the tuple to be dereferenced into its component elements before doing anything with it.

Say with the previous tuple, I want to generate as many of the strings(first element of the tuple) as the count(the second element of the tuple), expressed the following way:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(tup -> {
        return Flux.range(1, tup.getT2()).map(i -> tup.getT1() + i);
    })

The problem here is when referring to "tup.getT1()" and "tup.getT2()", it is not entirely clear what the tuple holds. Though more verbose I would prefer doing something like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(tup -> {
        String s = tup.getT1();
        int count = tup.getT2();

        return Flux.range(1, count).map(i -> s + i);
    });

Solution

The approach of de-structuring into explicit variables with meaningful name works, but it is a little verbose. A far better approach is provided using a utility set of functions that Project reactor comes with called TupleUtils

I feel it is best explained using a sample, with TupleUtils the previous de-structuring looks like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(TupleUtils.function((s, count) ->
            Flux.range(1, count).map(i -> s + i)))

This looks far more concise than explicit de-structuring. There is a bit of cleverness that needs some getting used to though:

The signature of flatMapMany is -

public final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapper)

TupleUtils provides another indirection which returns the Function required above, through another function:

public static <T1, T2, R> Function<Tuple2<T1, T2>, R> function(BiFunction<T1, T2, R> function) {
    return tuple -> function.apply(tuple.getT1(), tuple.getT2());
}

If you are using Kotlin, there is a simpler approach possible. This is based on the concept of "Destructuring Declarations". Project reactor provides a set of Kotlin helper utilities using an additional gradle dependency:

implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")

With this dependency in place, an equivalent Kotlin code looks like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany { (s: String, count: Int) ->
        Flux.range(1, count).map { i: Int -> s + i }
    }

See how a tuple has been de-structured directly into variables. This looks like this in isolation:

val (s: String, count: Int) = tup

Conclusion

It is important to de-structure a tuple into more meaningful variables to improve readability of the code and the useful functions provided by the TupleUtils as well as the Kotlin extensions helps keep the code concise but readable.

The Java samples are here and Kotlin samples here

Tuesday, November 12, 2019

Hash a Json

I recently wrote a simple library to predictably hash a json.

The utility is built on top of the excellent Jackson Json parsing library


Problem

I needed a hash generated out of a fairly large json based content to later determine if the content has changed at all. Treating json as a string is not an option as formatting, shuffling of keys can skew the results.

Solution

The utility is simple - it traverses the Jackson JsonNode representation of the json:

1. For every object node, it sorts the keys and then traverses the elements, calculates aggregated hash from all the children
2. For every array node, it traverses to the elements and aggregates the hash
3. For every terminal node, it takes the key and value and generates the SHA-256 hash from it


This way the hash is generated for the entire tree.

Consider a Jackson Json Node, created the following way in code:

ObjectNode jsonNode = JsonNodeFactory
        .instance
        .objectNode()
        .put("key1", "value1");

jsonNode.set("key2", JsonNodeFactory.instance.objectNode()
        .put("child-key2", "child-value2")
        .put("child-key1", "child-value1")
        .put("child-key3", 123.23f));

jsonNode.set("key3", JsonNodeFactory.instance.arrayNode()
        .add("arr-value1")
        .add("arr-value2"));

String calculatedHash = sha256Hex(
        sha256Hex("key1") + sha256Hex("value1")
                + sha256Hex("key2") + sha256Hex(
                sha256Hex("child-key1") + sha256Hex("child-value1")
                        + sha256Hex("child-key2") + sha256Hex("child-value2")
                        + sha256Hex("child-key3") + sha256Hex("123.23"))
                + sha256Hex("key3") + sha256Hex(
                sha256Hex("arr-value1")
                        + sha256Hex("arr-value2"))
);

Here the json has 3 keys, "key1", "key2", "key3". "key1" has a primitive text field, "key2" is an object node, "key3" is an array of strings. The calculatedHash shows how the aggregated hash is calculated for the entire tree, the utility follows the same process to aggregate a hash.

If you are interested in giving this a whirl, the library is available in bintray - https://bintray.com/bijukunjummen/repo/json-hash and hosted on github here - https://github.com/bijukunjummen/json-hash

Sunday, September 15, 2019

Unit test for Spring's WebClient

WebClient to quote its Java documentation is Spring Framework's
Non-blocking, reactive client to perform HTTP requests, exposing a fluent, reactive API over underlying HTTP client libraries such as Reactor Netty
.

In my current project I have been using WebClient extensively in making service to service calls and have found it to be an awesome API and I love its use of fluent interface.

Consider a remote service which returns a list of "Cities". A code using WebClient looks like this:

...
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import java.net.URI

class CitiesClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {

    fun getCities(): Flux<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }
    }
}

It is difficult to test a client making use of WebClient though. In this post, I will go over the challenges in testing a client using WebClient and a clean solution.

Challenges in mocking WebClient

An effective unit test of the "CitiesClient" class would require mocking of WebClient and every method call in the fluent interface chain along these lines:

val mockWebClientBuilder: WebClient.Builder = mock()
val mockWebClient: WebClient = mock()
whenever(mockWebClientBuilder.build()).thenReturn(mockWebClient)

val mockRequestSpec: WebClient.RequestBodyUriSpec = mock()
whenever(mockWebClient.get()).thenReturn(mockRequestSpec)
val mockRequestBodySpec: WebClient.RequestBodySpec = mock()

whenever(mockRequestSpec.uri(any<URI>())).thenReturn(mockRequestBodySpec)

whenever(mockRequestBodySpec.accept(any())).thenReturn(mockRequestBodySpec)

val citiesJson: String = this.javaClass.getResource("/sample-cities.json").readText()

val clientResponse: ClientResponse = ClientResponse
        .create(HttpStatus.OK)
        .header("Content-Type","application/json")
        .body(citiesJson).build()

whenever(mockRequestBodySpec.exchange()).thenReturn(Mono.just(clientResponse))

val citiesClient = CitiesClient(mockWebClientBuilder, "http://somebaseurl")

val cities: Flux<City> = citiesClient.getCities()

This makes for an extremely flaky test as any change in the order of calls would result in new mocks that will need to be recorded.

Testing using real endpoints


An approach that works well is to bring up a real server that behaves like the target of a client. Two mock servers that work really well are mockwebserver in okhttp library and WireMock. An example with Wiremock looks like this:

import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.core.WireMockConfiguration
import org.bk.samples.model.City
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.springframework.http.HttpStatus
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import reactor.test.StepVerifier

class WiremockWebClientTest {

    @Test
    fun testARemoteCall() {
        val citiesJson = this.javaClass.getResource("/sample-cities.json").readText()
        WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching("/cities"))
                .withHeader("Accept", WireMock.equalTo("application/json"))
                .willReturn(WireMock.aResponse()
                        .withStatus(HttpStatus.OK.value())
                        .withHeader("Content-Type", "application/json")
                        .withBody(citiesJson)))

        val citiesClient = CitiesClient(WebClient.builder(), "http://localhost:${WIREMOCK_SERVER.port()}")

        val cities: Flux<City> = citiesClient.getCities()
        
        StepVerifier
                .create(cities)
                .expectNext(City(1L, "Portland", "USA", 1_600_000L))
                .expectNext(City(2L, "Seattle", "USA", 3_200_000L))
                .expectNext(City(3L, "SFO", "USA", 6_400_000L))
                .expectComplete()
                .verify()
    }

    companion object {
        private val WIREMOCK_SERVER = WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort())

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

Here a server is being brought up at a random port, it is then injected with a behavior and then the client is tested against this server and validated. This approach works and there is no muddling with the internals of WebClient in mocking this behavior, but technically this is an integration test and it will be slower to execute than a pure unit test.


Unit testing by short-circuiting the remote call

An approach that I have been using recently is to short circuit the remote call using an ExchangeFunction. An ExchangeFunction represents the actual mechanisms in making the remote call and can be replaced with one that responds with what the test expects the following way:

import org.junit.jupiter.api.Test
import org.springframework.http.HttpStatus
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.ExchangeFunction
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

class CitiesWebClientTest {

    @Test
    fun testCleanResponse() {
        val citiesJson: String = this.javaClass.getResource("/sample-cities.json").readText()

        val clientResponse: ClientResponse = ClientResponse
                .create(HttpStatus.OK)
                .header("Content-Type","application/json")
                .body(citiesJson).build()
        val shortCircuitingExchangeFunction = ExchangeFunction {
            Mono.just(clientResponse)
        }

        val webClientBuilder: WebClient.Builder = WebClient.builder().exchangeFunction(shortCircuitingExchangeFunction)
        val citiesClient = CitiesClient(webClientBuilder, "http://somebaseurl")

        val cities: Flux<City> = citiesClient.getCities()

        StepVerifier
                .create(cities)
                .expectNext(City(1L, "Portland", "USA", 1_600_000L))
                .expectNext(City(2L, "Seattle", "USA", 3_200_000L))
                .expectNext(City(3L, "SFO", "USA", 6_400_000L))
                .expectComplete()
                .verify()
    }
}

The WebClient is injected with a ExchangeFunction which simply returns a response with the expected behavior of the remote server. This has short circuited the entire remote call and allows the client to be tested comprehensively. This approach depends on a little knowledge of the internals of the WebClient. This is a decent compromise though as it would run far faster than a test using WireMock.

This approach is not original though, I have based this test on some of the tests used for testing WebClient itself, for eg, the one here


Conclusion

I personally prefer the last approach, it has enabled me to write fairly comprehensive unit tests for a Client making use of WebClient for remote calls. My project with fully working samples is here.

Tuesday, August 6, 2019

Chicken and egg - resolving Spring properties ahead of a test

Consider a service class responsible for making a remote call and retrieving a detail:


...
public class CitiesService {
    private final WebClient.Builder webClientBuilder;
    private final String baseUrl;

    public CitiesService(
            WebClient.Builder webClientBuilder,
            @Value("${cityservice.url}") String baseUrl) {
        this.webClientBuilder = webClientBuilder;
        this.baseUrl = baseUrl;
    }


    public Flux<City> getCities() {
        return this.webClientBuilder.build()
                .get()
....

This is a Spring Bean and resolves the url to call through a property called "cityservice.url".

If I wanted to test this class, an approach that I have been using when using WebClient is to start a mock server using the excellent Wiremock and using it to test this class. A Wiremock mock looks like this:

    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());


    .....

    WIREMOCK_SERVER.stubFor(get(urlEqualTo("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .willReturn(aResponse()
                        .withStatus(200)
                        .withHeader("Content-Type", "application/json")
                        .withBody(resultJson)));

The Wiremock server is being started up at a random port and it is set to respond to an endpoint called "/cities". Here is where the chicken and egg problem comes up:

1. The CitiesService class requires a property called "cityservice.url" to be set before starting the test.
2. Wiremock is started at a random port and the url that it is responding to is "http://localhost:randomport" and is available only once the test is kicked off.


There are three potential solutions that I can think of to break this circular dependency:

Approach 1: To use a hardcoded port

This approach depends on starting up Wiremock on a fixed port instead of a dynamic port, this way the property can be set when starting the test up, something like this:

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = CitiesServiceHardcodedPortTest.SpringConfig.class,
        properties = "cityservice.url=http://localhost:9876")
public class CitiesServiceHardcodedPortTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().port(9876));


Here Wiremock is being started at port 9876 and the property at startup is being set to "http://localhost:9876/".

This solves the problem, however, this is not CI server friendly, it is possible for the ports to collide at runtime and this makes for a flaky test.


Approach 2: Not use Spring for test

A better approach is to not use the property, along these lines:

public class CitiesServiceDirectTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());

    private CitiesService citiesService;

    @BeforeEach
    public void beforeEachTest() {
        final WebClient.Builder webClientBuilder = WebClient.builder();

        this.citiesService = new CitiesService(webClientBuilder, WIREMOCK_SERVER.baseUrl());
    }


Here the service is being created by explicitly setting the baseUrl in the constructor, thus avoiding the need to set a property ahead of the test.


Approach 3: Application Context Initializer


ApplicationContextInitializer is used for programmatically initializing a Spring Application Context and it can be used with a test to inject in the property before the actual test is executed. Along these lines:

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = CitiesServiceSpringTest.SpringConfig.class)
@ContextConfiguration(initializers = {CitiesServiceSpringTest.PropertiesInitializer.class})
public class CitiesServiceSpringTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());

    @Autowired
    private CitiesService citiesService;

    @Test
    public void testGetCitiesCleanFlow() throws Exception {
        ...
    }



    static class PropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(ConfigurableApplicationContext applicationContext) {
            TestPropertyValues.of(
                    "cityservice.url=" + "http://localhost:" + WIREMOCK_SERVER.port()
            ).applyTo(applicationContext.getEnvironment());
        }
    }

}

Wiremock is started up first, then Spring context is initialized using the initializer which injects in the "cityservice.url" property using the Wiremocks dynamic port, this way the property is available for wiring into CityService.


Conclusion

I personally prefer Approach 2, however it is good to have Spring's wiring and the dependent beans created ahead of the test and if the class utilizes these then I prefer Approach 3. Application Context initializer provides a good way to break the chicken and egg problem with properties like these which need to be available ahead of Spring's context getting engaged.

All the code samples are available here:

Approach 1: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceHardcodedPortTest.java
Approach 2: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceDirectTest.java
Approach 3: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceSpringTest.java

Sunday, June 9, 2019

Callback hell and Reactive patterns

One of the ways that I have better understood the usefulness of a Reactive Streams based approach is how it simplifies a Non-blocking IO call.

This post will be a quick walkthrough of the kind of code involved in making a synchronous remote call, then show how layering in Non-blocking IO though highly efficient in the use of resources(especially threads) introduces complications referred to as a callback hell and how a reactive streams based approach simplifies the programming model.


Target Service


Since I will be writing a client call, my target service representing the details of a City has two endpoints. One returning a list of city id's when called with a uri of type - "/cityids" and a sample result looks like this:

[
    1,
    2,
    3,
    4,
    5,
    6,
    7
]

and an endpoint returning the details of a city given its id, for example when called using an id of 1 - "/cities/1":

{
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
}

The client's responsibility is to get the list of city id's and then for each city id get the detail of the city and put it together into a list of cities.

Synchronous call


I am using Spring Framework's RestTemplate to make the remote call. A Kotlin function to get the list of cityids looks like this:

private fun getCityIds(): List<String> {
    val cityIdsEntity: ResponseEntity<List<String>> = restTemplate
            .exchange("http://localhost:$localServerPort/cityids",
                    HttpMethod.GET,
                    null,
                    object : ParameterizedTypeReference<List<String>>() {})
    return cityIdsEntity.body!!
}

and to get the details of a city:

private fun getCityForId(id: String): City {
    return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}

Given these two functions it is easy to compose them such that a list of cities is returned:

val cityIds: List<String> = getCityIds()
val cities: List<City> = cityIds
        .stream()
        .map<City> { cityId -> getCityForId(cityId) }
        .collect(Collectors.toList())

cities.forEach { city -> LOGGER.info(city.toString()) }

The code is very easy to understand, however, there are 8 blocking calls involved -
1. to get the list of 7 city ids and then to get the details for each
2. To get the details of each of the 7 cities

Each of these calls would have been on a different thread.

Using Non-Blocking IO with callback

I will be using a library called AsyncHttpClient to make a non-blocking IO call.

AyncHttpClient returns a ListenableFuture type when a remote call is made.

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cityids")
                .execute()

A callback can be attached to a Listenable future to act on the response when available.

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})
    ....
}        

Given the list of cityids I want to get the details of the city, so from the response I need to make more remote calls and attach a callback for each of the calls to get the details of the city along these lines:

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
        .prepareGet("http://localhost:$localServerPort/cityids")
        .execute()

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})

    cityIds.stream().map { cityId ->
        val cityListenableFuture = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cities/$cityId")
                .execute()

        cityListenableFuture.addListener(Runnable {
            val cityDescResp = cityListenableFuture.get()
            val cityDesc = cityDescResp.responseBody
            val city = objectMapper.readValue(cityDesc, City::class.java)
            LOGGER.info("Got city: $city")
        }, executor)
    }.collect(Collectors.toList())
}, executor)

This is a gnarly piece of code, there is set of callbacks within a callback which is very difficult to reason about and make sense of and hence referred to as the callback hell.


Using Non-Blocking IO with Java CompletableFuture


This code can be improved a little by returning a Java's CompletableFuture as the return type instead of the ListenableFuture. CompletableFuture provides operators that allow the return type to modified and returned.

As an example, consider the function to get the list of city ids:

private fun getCityIds(): CompletableFuture<List<Long>> {
    return asyncHttpClient
            .prepareGet("http://localhost:$localServerPort/cityids")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
                l
            }
}

Here I am using the "thenApply" operator to transform "CompletableFuture<Response>" to "CompletableFuture<List<Long>>

And similarly to get the detail a city:

private fun getCityDetail(cityId: Long): CompletableFuture<City> {
    return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                LOGGER.info("Got {}", s)
                val city = objectMapper.readValue(s, City::class.java)
                city
            }
}

This is an improvement from the Callback based approach, however, CompletableFuture lacks sufficient operators, say in this specific instance where all the city details need to be put together:

val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds()
val citiesCompletableFuture: CompletableFuture<List<City>> =
        cityIdsFuture
                .thenCompose { l ->
                    val citiesCompletable: List<CompletableFuture<City>> =
                            l.stream()
                                    .map { cityId ->
                                        getCityDetail(cityId)
                                    }.collect(toList())

                    val citiesCompletableFutureOfList: CompletableFuture<List<City>> =
                            CompletableFuture.allOf(*citiesCompletable.toTypedArray())
                                    .thenApply { _: Void? ->
                                        citiesCompletable
                                                .stream()
                                                .map { it.join() }
                                                .collect(toList())
                                    }
                    citiesCompletableFutureOfList
                }

I have used an operator called CompletableFuture.allOf which returns a "Void" type and has to be coerced to return the desired type of ""CompletableFuture<List<City>>.


Using Project Reactor

Project Reactor is an implementation of the Reactive Streams specification. It has two specialized types to return a stream of 0/1 item and a stream of 0/n items - the former is a Mono, the latter a Flux.

Project Reactor provides a very rich set of operators that allow the stream of data to be transformed in a variety of ways. Consider first the function to return a list of City ids:

private fun getCityIds(): Flux<Long> {
    return webClient.get()
            .uri("/cityids")
            .exchange()
            .flatMapMany { response ->
                LOGGER.info("Received cities..")
                response.bodyToFlux<Long>()
            }
}

I am using Spring's excellent WebClient library to make the remote call and get a Project reactor "Mono<ClientResponse>" type of response, which can be modified to a "Flux<Long>" type using the "flatMapMany" operator.

Along the same lines to get the detail of the city, given a city id:

private fun getCityDetail(cityId: Long?): Mono<City> {
    return webClient.get()
            .uri("/cities/{id}", cityId!!)
            .exchange()
            .flatMap { response ->
                val city: Mono<City> = response.bodyToMono()
                LOGGER.info("Received city..")
                city
            }
}

Here a Project reactor "Mono<ClientResponse>" type is being transformed to "Mono<City>" type using the "flatMap" operator.

and the code to get the cityids and then the City's from it:

val cityIdsFlux: Flux<Long> = getCityIds()
val citiesFlux: Flux<City> = cityIdsFlux
        .flatMap { this.getCityDetail(it) }

return citiesFlux


This is very expressive - contrast the mess of a callback based approach and the simplicity of the reactive streams based approach.


Conclusion

In my mind, this is one of the biggest reasons to use a Reactive Streams based approach and in particular Project Reactor for scenarios that involve crossing asynchronous boundaries like in this instance to make remote calls. It cleans up the mess of callbacks and callback hells and provides a natural approach of modifying/transforming types using a rich set of operators.


My repository with a working version of all the samples that I have used here is available at https://github.com/bijukunjummen/reactive-cities-demo/tree/master/src/test/kotlin/samples/geo/kotlin

Tuesday, April 30, 2019

Functional Hystrix using Spring Cloud HystrixCommands

Spring's WebClient provides a non-blocking client for making service to service calls. Hystrix, though now in a maintenance mode, has been used for protecting service to service calls by preventing cascading failures, providing circuit breakers for calls to slow or faulty upstream services.

In this post, I will be exploring how Spring Cloud provides a newer functional approach to wrapping a remote call with Hystrix.


Consider a simple service that returns a list of entities, say a list of cities, modeled using the excellent Wiremock tool:

WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching("/cities"))
                .withHeader("Accept", WireMock.equalTo("application/json"))
                .willReturn(WireMock.aResponse()
                        .withStatus(HttpStatus.OK.value())
                        .withFixedDelay(5000)
                        .withHeader("Content-Type", "application/json")))

When called with a uri of the type "/cities" this Wiremock endpoint responds with a json of the following type:

[
  {
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
  },
  {
    "country": "USA",
    "id": 2,
    "name": "Seattle",
    "pop": 3200000
  },
  {
    "country": "USA",
    "id": 3,
    "name": "SFO",
    "pop": 6400000
  }
]

after a delay of 5 seconds.

Traditional approach


There are many approaches to using Hystrix, I have traditionally preferred an approach where an explicit Hystrix Command protects the remote call, along these lines:

import com.netflix.hystrix.HystrixCommandGroupKey
import com.netflix.hystrix.HystrixCommandKey
import com.netflix.hystrix.HystrixCommandProperties
import com.netflix.hystrix.HystrixObservableCommand
import org.bk.samples.model.City
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import rx.Observable
import rx.RxReactiveStreams
import rx.schedulers.Schedulers
import java.net.URI


class CitiesHystrixCommand(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) : HystrixObservableCommand<City>(
        HystrixObservableCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("cities-service"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("cities-service"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionTimeoutInMilliseconds(4000))) {
    override fun construct(): Observable<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        val result: Flux<City> = webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }

        return RxReactiveStreams.toObservable(result)
    }

    override fun resumeWithFallback(): Observable<City> {
        LOGGER.error("Falling back on cities call", executionException)
        return Observable.empty()
    }

    companion object {
        private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java)
    }
}

This code can now be used to make a remote call the following way:

import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient


class CitiesHystrixCommandBasedClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {
    fun getCities(): Flux<City> {
        val citiesObservable: Observable<City> = CitiesHystrixCommand(webClientBuilder, citiesBaseUrl)
                .observe()
                .subscribeOn(Schedulers.io())

        return Flux
                .from(RxReactiveStreams
                        .toPublisher(citiesObservable))
    }
}


Two things to note here,
1. WebClient returns a Project Reactor "Flux" type representing a list of cities, however Hystrix is Rx-Java 1 based, so Flux is being transformed to Rx-Java Observable using "RxReactiveStreams.toObservable()" call, provided by the RxJavaReactiveStreams library here.

2. I still want Project Reactor "Flux" type to be used in the rest of the application, so there is another adapter that converts the Rx-Java Observable back to a Flux - "Flux.from(RxReactiveStreams.toPublisher(citiesObservable))" once the call wrapped in Hystrix returns.


If I were to try this client with the wiremock sample with the 5 second delay, it correctly handles the delay and returns after a second.


Functional approach


There is a lot of boiler-plate with the previous approach which is avoided with the new functional approach of using HystrixCommands, a utility class which comes with Spring Cloud which provides a functional approach to making the remote call wrapped with Hystrix.

The entirety of the call using HystrixCommands looks like this:

import com.netflix.hystrix.HystrixCommandProperties
import org.bk.samples.model.City
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.cloud.netflix.hystrix.HystrixCommands
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import rx.schedulers.Schedulers
import java.net.URI

class CitiesFunctionalHystrixClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {
    fun getCities(): Flux<City> {
        return HystrixCommands
                .from(callCitiesService())
                .commandName("cities-service")
                .groupName("cities-service")
                .commandProperties(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(1000)
                )
                .toObservable { obs ->
                    obs.observe()
                            .subscribeOn(Schedulers.io())
                }
                .fallback { t: Throwable ->
                    LOGGER.error(t.message, t)
                    Flux.empty()
                }
                .toFlux()
    }

    fun callCitiesService(): Flux<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }
    }

    companion object {
        private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java)
    }
}


A lot of boiler-plate is avoided with this approach -
1. an explicit command is not required anymore
2. the call and the fallback are coded in a fluent manner
3. Any overrides can be explicitly specified - in this specific instance the timeout of 1 second.

Conclusion

I like the conciseness which HystrixCommands brings to the usage of Hystrix with WebClient. I have the entire sample available in my github repo - https://github.com/bijukunjummen/webclient-hystrix-sample, all the dependencies required to get the samples to work is part of this repo. If you are interested in sticking with Rx-Java 1, then an approach described here may help you avoid boiler-plate with vanilla Hystrix