Sunday, March 22, 2020

Processing SQS Messages using Spring Boot and Project Reactor

I recently worked on a project where I had to efficiently process a large number of messages streaming in through an AWS SQS Queue. In this post (and potentially one more), I will go over the approach that I took to process the messages using the excellent Project Reactor



The following is the kind of set-up that I am aiming for:



Setting up a local AWS Environment

Before I jump into the code, let me get some preliminaries out of the way. First, how do you get a local version of SNS and SQS. One of the easiest ways is to use localstack. I use a docker-compose version of it described here

The second utility that I will be using is the AWS CLI. This website has details on how to install it locally.

Once both of these utilities are in place, a quick test should validate the setup:

# Create a queue
aws --endpoint http://localhost:4576 sqs create-queue --queue-name test-queue

# Send a sample message
aws --endpoint http://localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"

# Receive the message
aws --endpoint http://localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue


Basics of Project Reactor


Project Reactor implements the Reactive Streams specification and provides a way of handling streams of data across asynchronous boundaries that respects backpressure. A lot of words here but in essence think of it this way:
1. SQS Produces data
2. The application is going to consume and process it as a stream of data
3. The application should consume data at a pace that is sustainable - too much data should not be pumped in. This is formally referred to as "Backpressure"



AWS SDK 2

The library that I will be using to consume AWS SQS data is the AWS SDK 2. The library uses non-blocking IO under the covers.

The library offers both a sync version of making calls as well as an async version. Consider the synchronous way to fetch records from an SQS queue:

import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.SqsClient

val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()

Here "software.amazon.awssdk.services.sqs.SqsClient" is being used for querying sqs and retrieving a batch of results synchronously. An async result, on the other hand, looks like this:



val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }


The output now is now a "CompletableFuture"

Infinite loop and no backpressure

My first attempt at creating a stream(Flux) of message is fairly simple - an infinite loop that polls AWS sqs and creates a Flux from it using the "Flux.create" operator, this way:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()

                    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
                    LOGGER.info("Received: $messages")
                    sink.next(messages)
                } catch (e: InterruptedException) {
                    LOGGER.error(e.message, e)
                } catch (e: Exception) {
                    LOGGER.error(e.message, e)
                }
            }
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that there is an infinite loop that checks for new messages using long-polling. Messages may not be available at every poll, in which case an empty list is added to the stream.

This list of atmost 5 messages is then mapped to a stream of individual messages using the "flatMapIterable" operator, which is further mapped by extracting the message from the SNS wrapper (as message gets forwarded from SNS to SQS, SNS adds a wrapper to the message) and a way to delete the message(deleteHandle) once the message is successfully processed is returned as Pair.


This approach works perfectly fine... but imagine a case where a huge number of messages have come in, since the loop is not really aware of the throughput downstream it will keep pumping data to the stream. The default behavior is for the intermediate operators to buffer this data flowing in based on how the final consumer is consuming the data. Since this buffer is unbounded it is possible that the system may reach an unsustainable state.


Backpressure aware stream

The fix is to use a different operator to generate the stream of data - Flux.generate.
Using this operator the code looks like this:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.generate { sink: SynchronousSink<List<Message>> ->
            val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(5)
                .waitTimeSeconds(10)
                .build()

            val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
            LOGGER.info("Received: $messages")
            sink.next(messages)
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that the block passed to the "Flux.generate" operator is repeatedly called - similar to the while loop, in each loop one item is expected to be added to the stream. In this instance, the item added to the stream happens to be a list which like before is broken down into individual messages.

How does backpressure work in this scenario -
So again consider the case where the downstream consumer is processing at a slower rate than the generating end. In this case, Flux itself would slow down at the rate at which the generate operator is called, thus being considerate of the throughput of the downstream system.

Conclusion

This should set up a good pipeline for processing messages from SQS, there are a few more nuances to this to process messages in parallel later in the stream which I will cover in a future post.

The codebase of this example is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs. The code has a complete pipeline which includes processing the message and deleting it once processed.

Tuesday, February 25, 2020

Project Reactor expand method

One of my colleagues at work recently introduced me to the expand operator of the Project Reactor types and in this post I want to cover a few ways in which I have used it.


Unrolling a Paginated Result

Consider a Spring Data based repository on a model called City:

import org.springframework.data.jpa.repository.JpaRepository;
import samples.geo.domain.City;

public interface CityRepo extends JpaRepository<City, Long> {
}
This repository provides a way to retrieve the paginated result, along the following lines:

cityRepo.findAll(PageRequest.of(0, 5))

Now, if I were to unroll multiple pages into a result, the way to do it would be the following kind of a loop:

var pageable: Pageable = PageRequest.of(0, 5)
do {
    var page: Page<City> = cityRepo.findAll(pageable)
    page.content.forEach { city -> LOGGER.info("City $city") }
    pageable = page.nextPageable()
} while (page.hasNext())


An equivalent unroll of a paginated result can be done using the Reactor expand operator the following way:

val result: Flux<City> =
    Mono
        .fromSupplier { cityRepo.findAll(PageRequest.of(0, 5)) }
        .expand { page ->
            if (page.hasNext())
                Mono.fromSupplier { cityRepo.findAll(page.nextPageable()) }
            else
                Mono.empty()
        }
        .flatMap { page -> Flux.fromIterable(page.content) }

result.subscribe(
    { page -> LOGGER.info("City ${page}") },
    { t -> t.printStackTrace() }
)


Here the first page of results expands to the second page, the second page to the third page and so on until there are no pages to retrieve.

Traversing a Tree


Consider a node in a tree structure represented by the following model:

data class Node(
    val id: String,
    val nodeRefs: List<String>,
)

A sample data which looks like this:


can be traversed using a call which looks like this:

val rootMono: Mono<Node> = nodeService.getNode("1")
val expanded: Flux<Node> = rootMono.expand { node ->
    Flux.fromIterable(node.childRefs)
        .flatMap { nodeRef -> nodeService.getNode(nodeRef) }
}
expanded.subscribe { node -> println(node) }

This is a breadth-first expansion, the output looks like this:

Node-1
Node-1-1
Node-1-2
Node-1-1-1
Node-1-1-2
Node-1-2-1
Node-1-2-2

An expandDeep variation would traverse it depth-first

Monday, January 20, 2020

Spring WebClient and Java date-time fields

WebClient is Spring Framework's reactive client for making service to service calls.

WebClient has become a go to utility for me, however I unexpectedly encountered an issue recently in the way it handles Java 8 time fields that tripped me up and this post goes into the details.


Happy Path

First the happy path. When using a WebClient, Spring Boot advices a "WebClient.Builder" to be injected into a class instead of the "WebClient" itself and a WebClient.Builder is already auto-configured and available for injection.


Consider a fictitious "City" domain and a client to create a "City". "City" has a simple structure, note that the creationDate is a Java8 "Instant" type:

import java.time.Instant

data class City(
    val id: Long,
    val name: String,
    val country: String,
    val pop: Long,
    val creationDate: Instant = Instant.now()
)

The client to create an instance of this type looks like this:

class CitiesClient(
    private val webClientBuilder: WebClient.Builder,
    private val citiesBaseUrl: String
) {
    fun createCity(city: City): Mono<City> {
        val uri: URI = UriComponentsBuilder
            .fromUriString(citiesBaseUrl)
            .path("/cities")
            .build()
            .encode()
            .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .bodyValue(city)
            .exchange()
            .flatMap { clientResponse ->
                clientResponse.bodyToMono(City::class.java)
            }
    }
}

See how the intent is expressed in a fluent way. The uri and the headers are first being set, the request body is then put in place and the response is unmarshalled back to a "City" response type.


All well and good. Now how does a test look like.

I am using the excellent Wiremock to bring up a dummy remote service and using this CitiesClient to send the request, along these lines:

@SpringBootTest 
@AutoConfigureJson
class WebClientConfigurationTest {

    @Autowired
    private lateinit var webClientBuilder: WebClient.Builder

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @Test
    fun testAPost() {
        val dateAsString = "1985-02-01T10:10:10Z"

        val city = City(
            id = 1L, name = "some city",
            country = "some country",
            pop = 1000L,
            creationDate = Instant.parse(dateAsString)
        )
        WIREMOCK_SERVER.stubFor(
            post(urlMatching("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .withHeader("Content-Type", equalTo("application/json"))
                .willReturn(
                    aResponse()
                        .withHeader("Content-Type", "application/json")
                        .withStatus(HttpStatus.CREATED.value())
                        .withBody(objectMapper.writeValueAsString(city))
                )
        )

        val citiesClient = CitiesClient(webClientBuilder, "http://localhost:${WIREMOCK_SERVER.port()}")

        val citiesMono: Mono<City> = citiesClient.createCity(city)

        StepVerifier
            .create(citiesMono)
            .expectNext(city)
            .expectComplete()
            .verify()


        //Ensure that date field is in ISO-8601 format..
        WIREMOCK_SERVER.verify(
            postRequestedFor(urlPathMatching("/cities"))
                .withRequestBody(matchingJsonPath("$.creationDate", equalTo(dateAsString)))
        )
    }

    companion object {
        private val WIREMOCK_SERVER =
            WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().notifier(ConsoleNotifier(true)))

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

In the highlighted lines, I want to make sure that the remote service receives the date in ISO-8601 format as "1985-02-01T10:10:10Z". In this instance everything works cleanly and the test passes.

Not so happy path

Consider now a case where I have customized the WebClient.Builder in some form. An example is say I am using a registry service and I want to look up a remote service via this registry and then make a call then the WebClient has to be customized to add a "@LoadBalanced" annotation on it - some details here

So say, I have customized WebClient.Builder this way:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().filter { req, next ->
            LOGGER.error("Custom filter invoked..")
            next.exchange(req)
        }
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

It looks straightforward, however now the previous test fails. Specifically the date format of the creationDate over the wire is not ISO-8601 anymore, the raw request looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": 476100610.000000000
}

vs for a working request:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

See how the date format is different.

Problem

The underlying reason for this issue is simple, Spring Boot adds a bunch of configuration on WebClient.Builder that is lost when I have explicitly created the bean myself. Specifically in this instance there is a Jackson ObjectMapper created under the covers which by default writes dates as timestamps - some details here.

Solution

Okay, so how do we get back the customizations that Spring Boot makes. I have essentially replicated the behavior of a auto-configuration in Spring called "WebClientAutoConfiguration" and it looks like this:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(customizerProvider: ObjectProvider<WebClientCustomizer>): WebClient.Builder {
        val webClientBuilder: WebClient.Builder = WebClient
            .builder()
            .filter { req, next ->
                LOGGER.error("Custom filter invoked..")
                next.exchange(req)
            }

        customizerProvider.orderedStream()
            .forEach { customizer -> customizer.customize(webClientBuilder) }

        return webClientBuilder;
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

There is a likely a better approach than just replicating this behavior, but this approach works for me.

The posted content now looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

with the date back in the right format.

Conclusion

Spring Boot's auto-configurations for WebClient provides a opinionated set of defaults. If for any reason the WebClient and it's builder need to be configured explicitly then be wary of some of the customizations that Spring Boot adds and replicate it for the customized bean. In my case, the Jackson customization for Java 8 dates was missing in my custom "WebClient.Builder" and had to be explicitly accounted for.

A sample test and a customization is available here

Thursday, December 26, 2019

Project reactor - de-structuring a Tuple

Tuples are simple data structures that hold a fixed set of items, each of a different data type.

Project Reactor provides a Tuple data structure that can hold about 8 different types.

Tuples are very useful, however one of the issues in using a Tuple is that it is difficult to make out what they hold without de-structuring them at every place they get used.

Problem

Consider a simple operator, which zips together a String and an integer this way:

Mono<Tuple2<String,  Integer>> tup = Mono.zip(Mono.just("a"), Mono.just(2))

The output is a Tuple holding 2 elements.

Now, the problem that I have is this. I prefer the tuple to be dereferenced into its component elements before doing anything with it.

Say with the previous tuple, I want to generate as many of the strings(first element of the tuple) as the count(the second element of the tuple), expressed the following way:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(tup -> {
        return Flux.range(1, tup.getT2()).map(i -> tup.getT1() + i);
    })

The problem here is when referring to "tup.getT1()" and "tup.getT2()", it is not entirely clear what the tuple holds. Though more verbose I would prefer doing something like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(tup -> {
        String s = tup.getT1();
        int count = tup.getT2();

        return Flux.range(1, count).map(i -> s + i);
    });

Solution

The approach of de-structuring into explicit variables with meaningful name works, but it is a little verbose. A far better approach is provided using a utility set of functions that Project reactor comes with called TupleUtils

I feel it is best explained using a sample, with TupleUtils the previous de-structuring looks like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany(TupleUtils.function((s, count) ->
            Flux.range(1, count).map(i -> s + i)))

This looks far more concise than explicit de-structuring. There is a bit of cleverness that needs some getting used to though:

The signature of flatMapMany is -

public final <R> Flux<R> flatMapMany(Function<? super T,? extends Publisher<? extends R>> mapper)

TupleUtils provides another indirection which returns the Function required above, through another function:

public static <T1, T2, R> Function<Tuple2<T1, T2>, R> function(BiFunction<T1, T2, R> function) {
    return tuple -> function.apply(tuple.getT1(), tuple.getT2());
}

If you are using Kotlin, there is a simpler approach possible. This is based on the concept of "Destructuring Declarations". Project reactor provides a set of Kotlin helper utilities using an additional gradle dependency:

implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")

With this dependency in place, an equivalent Kotlin code looks like this:

Mono.zip(Mono.just("a"), Mono.just(2))
    .flatMapMany { (s: String, count: Int) ->
        Flux.range(1, count).map { i: Int -> s + i }
    }

See how a tuple has been de-structured directly into variables. This looks like this in isolation:

val (s: String, count: Int) = tup

Conclusion

It is important to de-structure a tuple into more meaningful variables to improve readability of the code and the useful functions provided by the TupleUtils as well as the Kotlin extensions helps keep the code concise but readable.

The Java samples are here and Kotlin samples here

Tuesday, November 12, 2019

Hash a Json

I recently wrote a simple library to predictably hash a json.

The utility is built on top of the excellent Jackson Json parsing library


Problem

I needed a hash generated out of a fairly large json based content to later determine if the content has changed at all. Treating json as a string is not an option as formatting, shuffling of keys can skew the results.

Solution

The utility is simple - it traverses the Jackson JsonNode representation of the json:

1. For every object node, it sorts the keys and then traverses the elements, calculates aggregated hash from all the children
2. For every array node, it traverses to the elements and aggregates the hash
3. For every terminal node, it takes the key and value and generates the SHA-256 hash from it


This way the hash is generated for the entire tree.

Consider a Jackson Json Node, created the following way in code:

ObjectNode jsonNode = JsonNodeFactory
        .instance
        .objectNode()
        .put("key1", "value1");

jsonNode.set("key2", JsonNodeFactory.instance.objectNode()
        .put("child-key2", "child-value2")
        .put("child-key1", "child-value1")
        .put("child-key3", 123.23f));

jsonNode.set("key3", JsonNodeFactory.instance.arrayNode()
        .add("arr-value1")
        .add("arr-value2"));

String calculatedHash = sha256Hex(
        sha256Hex("key1") + sha256Hex("value1")
                + sha256Hex("key2") + sha256Hex(
                sha256Hex("child-key1") + sha256Hex("child-value1")
                        + sha256Hex("child-key2") + sha256Hex("child-value2")
                        + sha256Hex("child-key3") + sha256Hex("123.23"))
                + sha256Hex("key3") + sha256Hex(
                sha256Hex("arr-value1")
                        + sha256Hex("arr-value2"))
);

Here the json has 3 keys, "key1", "key2", "key3". "key1" has a primitive text field, "key2" is an object node, "key3" is an array of strings. The calculatedHash shows how the aggregated hash is calculated for the entire tree, the utility follows the same process to aggregate a hash.

If you are interested in giving this a whirl, the library is available in bintray - https://bintray.com/bijukunjummen/repo/json-hash and hosted on github here - https://github.com/bijukunjummen/json-hash

Sunday, September 15, 2019

Unit test for Spring's WebClient

WebClient to quote its Java documentation is Spring Framework's
Non-blocking, reactive client to perform HTTP requests, exposing a fluent, reactive API over underlying HTTP client libraries such as Reactor Netty
.

In my current project I have been using WebClient extensively in making service to service calls and have found it to be an awesome API and I love its use of fluent interface.

Consider a remote service which returns a list of "Cities". A code using WebClient looks like this:

...
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import java.net.URI

class CitiesClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {

    fun getCities(): Flux<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }
    }
}

It is difficult to test a client making use of WebClient though. In this post, I will go over the challenges in testing a client using WebClient and a clean solution.

Challenges in mocking WebClient

An effective unit test of the "CitiesClient" class would require mocking of WebClient and every method call in the fluent interface chain along these lines:

val mockWebClientBuilder: WebClient.Builder = mock()
val mockWebClient: WebClient = mock()
whenever(mockWebClientBuilder.build()).thenReturn(mockWebClient)

val mockRequestSpec: WebClient.RequestBodyUriSpec = mock()
whenever(mockWebClient.get()).thenReturn(mockRequestSpec)
val mockRequestBodySpec: WebClient.RequestBodySpec = mock()

whenever(mockRequestSpec.uri(any<URI>())).thenReturn(mockRequestBodySpec)

whenever(mockRequestBodySpec.accept(any())).thenReturn(mockRequestBodySpec)

val citiesJson: String = this.javaClass.getResource("/sample-cities.json").readText()

val clientResponse: ClientResponse = ClientResponse
        .create(HttpStatus.OK)
        .header("Content-Type","application/json")
        .body(citiesJson).build()

whenever(mockRequestBodySpec.exchange()).thenReturn(Mono.just(clientResponse))

val citiesClient = CitiesClient(mockWebClientBuilder, "http://somebaseurl")

val cities: Flux<City> = citiesClient.getCities()

This makes for an extremely flaky test as any change in the order of calls would result in new mocks that will need to be recorded.

Testing using real endpoints


An approach that works well is to bring up a real server that behaves like the target of a client. Two mock servers that work really well are mockwebserver in okhttp library and WireMock. An example with Wiremock looks like this:

import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.core.WireMockConfiguration
import org.bk.samples.model.City
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.springframework.http.HttpStatus
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import reactor.test.StepVerifier

class WiremockWebClientTest {

    @Test
    fun testARemoteCall() {
        val citiesJson = this.javaClass.getResource("/sample-cities.json").readText()
        WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching("/cities"))
                .withHeader("Accept", WireMock.equalTo("application/json"))
                .willReturn(WireMock.aResponse()
                        .withStatus(HttpStatus.OK.value())
                        .withHeader("Content-Type", "application/json")
                        .withBody(citiesJson)))

        val citiesClient = CitiesClient(WebClient.builder(), "http://localhost:${WIREMOCK_SERVER.port()}")

        val cities: Flux<City> = citiesClient.getCities()
        
        StepVerifier
                .create(cities)
                .expectNext(City(1L, "Portland", "USA", 1_600_000L))
                .expectNext(City(2L, "Seattle", "USA", 3_200_000L))
                .expectNext(City(3L, "SFO", "USA", 6_400_000L))
                .expectComplete()
                .verify()
    }

    companion object {
        private val WIREMOCK_SERVER = WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort())

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

Here a server is being brought up at a random port, it is then injected with a behavior and then the client is tested against this server and validated. This approach works and there is no muddling with the internals of WebClient in mocking this behavior, but technically this is an integration test and it will be slower to execute than a pure unit test.


Unit testing by short-circuiting the remote call

An approach that I have been using recently is to short circuit the remote call using an ExchangeFunction. An ExchangeFunction represents the actual mechanisms in making the remote call and can be replaced with one that responds with what the test expects the following way:

import org.junit.jupiter.api.Test
import org.springframework.http.HttpStatus
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.ExchangeFunction
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

class CitiesWebClientTest {

    @Test
    fun testCleanResponse() {
        val citiesJson: String = this.javaClass.getResource("/sample-cities.json").readText()

        val clientResponse: ClientResponse = ClientResponse
                .create(HttpStatus.OK)
                .header("Content-Type","application/json")
                .body(citiesJson).build()
        val shortCircuitingExchangeFunction = ExchangeFunction {
            Mono.just(clientResponse)
        }

        val webClientBuilder: WebClient.Builder = WebClient.builder().exchangeFunction(shortCircuitingExchangeFunction)
        val citiesClient = CitiesClient(webClientBuilder, "http://somebaseurl")

        val cities: Flux<City> = citiesClient.getCities()

        StepVerifier
                .create(cities)
                .expectNext(City(1L, "Portland", "USA", 1_600_000L))
                .expectNext(City(2L, "Seattle", "USA", 3_200_000L))
                .expectNext(City(3L, "SFO", "USA", 6_400_000L))
                .expectComplete()
                .verify()
    }
}

The WebClient is injected with a ExchangeFunction which simply returns a response with the expected behavior of the remote server. This has short circuited the entire remote call and allows the client to be tested comprehensively. This approach depends on a little knowledge of the internals of the WebClient. This is a decent compromise though as it would run far faster than a test using WireMock.

This approach is not original though, I have based this test on some of the tests used for testing WebClient itself, for eg, the one here


Conclusion

I personally prefer the last approach, it has enabled me to write fairly comprehensive unit tests for a Client making use of WebClient for remote calls. My project with fully working samples is here.

Tuesday, August 6, 2019

Chicken and egg - resolving Spring properties ahead of a test

Consider a service class responsible for making a remote call and retrieving a detail:


...
public class CitiesService {
    private final WebClient.Builder webClientBuilder;
    private final String baseUrl;

    public CitiesService(
            WebClient.Builder webClientBuilder,
            @Value("${cityservice.url}") String baseUrl) {
        this.webClientBuilder = webClientBuilder;
        this.baseUrl = baseUrl;
    }


    public Flux<City> getCities() {
        return this.webClientBuilder.build()
                .get()
....

This is a Spring Bean and resolves the url to call through a property called "cityservice.url".

If I wanted to test this class, an approach that I have been using when using WebClient is to start a mock server using the excellent Wiremock and using it to test this class. A Wiremock mock looks like this:

    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());


    .....

    WIREMOCK_SERVER.stubFor(get(urlEqualTo("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .willReturn(aResponse()
                        .withStatus(200)
                        .withHeader("Content-Type", "application/json")
                        .withBody(resultJson)));

The Wiremock server is being started up at a random port and it is set to respond to an endpoint called "/cities". Here is where the chicken and egg problem comes up:

1. The CitiesService class requires a property called "cityservice.url" to be set before starting the test.
2. Wiremock is started at a random port and the url that it is responding to is "http://localhost:randomport" and is available only once the test is kicked off.


There are three potential solutions that I can think of to break this circular dependency:

Approach 1: To use a hardcoded port

This approach depends on starting up Wiremock on a fixed port instead of a dynamic port, this way the property can be set when starting the test up, something like this:

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = CitiesServiceHardcodedPortTest.SpringConfig.class,
        properties = "cityservice.url=http://localhost:9876")
public class CitiesServiceHardcodedPortTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().port(9876));


Here Wiremock is being started at port 9876 and the property at startup is being set to "http://localhost:9876/".

This solves the problem, however, this is not CI server friendly, it is possible for the ports to collide at runtime and this makes for a flaky test.


Approach 2: Not use Spring for test

A better approach is to not use the property, along these lines:

public class CitiesServiceDirectTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());

    private CitiesService citiesService;

    @BeforeEach
    public void beforeEachTest() {
        final WebClient.Builder webClientBuilder = WebClient.builder();

        this.citiesService = new CitiesService(webClientBuilder, WIREMOCK_SERVER.baseUrl());
    }


Here the service is being created by explicitly setting the baseUrl in the constructor, thus avoiding the need to set a property ahead of the test.


Approach 3: Application Context Initializer


ApplicationContextInitializer is used for programmatically initializing a Spring Application Context and it can be used with a test to inject in the property before the actual test is executed. Along these lines:

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = CitiesServiceSpringTest.SpringConfig.class)
@ContextConfiguration(initializers = {CitiesServiceSpringTest.PropertiesInitializer.class})
public class CitiesServiceSpringTest {
    private static final WireMockServer WIREMOCK_SERVER =
            new WireMockServer(wireMockConfig().dynamicPort());

    @Autowired
    private CitiesService citiesService;

    @Test
    public void testGetCitiesCleanFlow() throws Exception {
        ...
    }



    static class PropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(ConfigurableApplicationContext applicationContext) {
            TestPropertyValues.of(
                    "cityservice.url=" + "http://localhost:" + WIREMOCK_SERVER.port()
            ).applyTo(applicationContext.getEnvironment());
        }
    }

}

Wiremock is started up first, then Spring context is initialized using the initializer which injects in the "cityservice.url" property using the Wiremocks dynamic port, this way the property is available for wiring into CityService.


Conclusion

I personally prefer Approach 2, however it is good to have Spring's wiring and the dependent beans created ahead of the test and if the class utilizes these then I prefer Approach 3. Application Context initializer provides a good way to break the chicken and egg problem with properties like these which need to be available ahead of Spring's context getting engaged.

All the code samples are available here:

Approach 1: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceHardcodedPortTest.java
Approach 2: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceDirectTest.java
Approach 3: https://github.com/bijukunjummen/reactive-cities-demo/blob/master/src/test/java/samples/geo/service/CitiesServiceSpringTest.java