Saturday, July 4, 2020

Expressing a conditional expression using Json - Java Implementation

I had a need recently to express a conditional expression in a form that a front end Javascript application and a backend Java application could both create and read. Expressing the conditional expression as a Json felt logical and after a quick search, JsonLogic library appeared to fit exactly what I was looking for.

JsonLogic follows a prefix notation for its expressions, along these lines:

{"operator" : ["values" ... ]}
So for eg, given a JSON input data that looks like this:
{
  "a": "hello",
  "b": 1,
  "c": [
    "elem1",
    "elem2"
  ]
}
For equality, an expression using JsonLogic is the following:
{"==" : [ { "var" : "a" }, "hello" ] }
Here the data is being looked up using "var" expression and the equality is checked using the "==" operator. 

Though it is a good fit, I decided to go with an alternate way of expressing the conditional expression but heavily inspired by JsonLogic. So, in my implementation, equality with the sample JSON looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Fairly similar, the location to the data is expressed as a Json Pointer and the operators are textual ("equals" vs "==") The full set of supported features are also much smaller than JsonLogic as that sufficed my needs for the project. So now I have a small Java-based library that supports these simplified conditional expressions and this post will go into the details of the operators and the usage of the library.



Sample Expressions

Once more to touch on the sample conditional expressions, everything takes the form of:
{"operator" : ["values" ... ]}
A check for equality looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Not operator:
{
    "not": [
        {
            "equals": [
                "/a",
                "hello"
            ]
        }
    ]
}
And/Or operator:
{
    "and": [
        {
            "equal": [
                "/a", "hello"
            ]
        },
        {
            "equal": [
                "/b", 1
            ]
        }
    ]
}
There are a few operators that work on collections, for eg, to check if "c" in the sample JSON has elements "elem1", "elem2":
{
    "contains": [
        "/c", ["elem1", "elem2"]
    ]
}
or to check if the collection has any of the elements "elem1", "elem2":
{
    "containsAnyOf": [
        "/c", ["elem1", "elem2"]
    ]
}

Details of the Library

The Java-based library is built on top of the excellent Jackson JSON parser library and uses it to parse the expression which once parsed is interpreted by the library. A Gradle based project can pull in the dependency the following way(published currently to JCenter):
implementation 'com.github.bijukunjummen:json-conditional-expression:0.4.0'
and use the library along these lines, using a sample Kotlin code:
val jsonExpressionEvaluator: JsonExpressionEvaluator = JsonExpressionEvaluator(ObjectMapper())
jsonExpressionEvaluator.matches(expression, json) //returns true

Conclusion

The conditional expression and the corresponding Java-based interpreter are fairly simple and have sufficed my needs with the kind of operators that I needed for my project, however, I will be more than happy to extend and support more extensive operators if there is enough interest in using the library.


Reference

1. JsonLogic which provided the inspiration for using a prefix notation to represent a conditional expression

Monday, May 25, 2020

AWS DynamoDB version field using AWS SDK for Java 2

It is useful to have a version attribute on any entity saved to an AWS DynamoDB database which is simply a numeric indication of the number of times the entity has been modified. When the entity is first created it can be set to 1 and then incremented on every update. 

The benefit is immediate - an indicator of the number of times an entity has been modified which can be used for auditing the entity. Also, an additional use is for optimistic locking where an entity is allowed to be updated only if the holder updating it has the right version of the entity.


This post will go into details of how to introduce such a field with the DynamoDB related libraries of AWS SDK 2

Model

Consider a model called Hotel which is being persisted into a dynamo database. In Kotlin, it can be represented using the following data class:

data class Hotel(
    val id: String = UUID.randomUUID().toString(),
    val name: String,
    val address: String? = null,
    val state: String? = null,
    val zip: String? = null,
    val version: Long = 1L
)
A version field has been introduced in this model with an initial value of 1. The aim will be to save this field as-is and then let dynamo atomically manage the increment of this field at the point of saving this entity.

As the fields in this model gets changed, I would like the version to be updated along these lines:


 


Local version of DynamoDB

It is useful to have DynamoDB running on the local machine, this way not having to create real DynamoDB tables in AWS.

There are multiple ways of doing this. One is to use a docker version of DynamoDB Local, which can be started up the following way to listen on port 4569:


docker run -p 4569:8000 amazon/dynamodb-local:1.13
My personal preference is to use localstack and the instructions at the site have different ways to start it up. I normally use docker-compose to bring it up. One of the reasons to use localstack over DynamoDB Local is that localstack provides a comprehensive set of AWS services for local testing and not just DynamoDB.


Quick Demo

I have the entire code available in my github repo herehttps://github.com/bijukunjummen/boot-with-dynamodb

Once the application is brought up using the local version of dynamoDB, an entity can be created using the following httpie request:



http :9080/hotels id=4 name=name address=address zip=zip state=OR
With a response, where the version field is set to 1:
{
    "address": "address",
    "id": "4",
    "name": "name",
    "state": "OR",
    "version": 1,
    "zip": "zip"
}
Then if the name is updated for the entity:
http PUT :9080/hotels/4 name=name1 address=address zip=zip state=OR version=1
the version field gets updated to 2 and so on:
{
    "address": "address",
    "id": "4",
    "name": "name1",
    "state": "OR",
    "version": 2,
    "zip": "zip"
}
Also note that if during an update a wrong version number is provided, the call would fail as there is an optimistic locking in place using this version field.

Implementing the version field

Implementing the version field depends on the powerful UpdateItem API provided by DynamoDB. One of the features of UpdateItem API is that it takes in a "UpdateExpression" which is a dsl which shows how different Dynamo attributes should be updated.

The raw request to AWS DynamoDB looks like this:

 
{
  "TableName": "hotels",
  "Key": {
    "id": {
      "S": "1"
    }
  },
  "UpdateExpression": "\nSET #name=:name,\n #state=:state,\naddress=:address,\nzip=:zip\nADD version :inc\n ",
  "ExpressionAttributeNames": {
    "#state": "state",
    "#name": "name"
  },
  "ExpressionAttributeValues": {
    ":name": {
      "S": "testhotel"
    },
    ":address": {
      "S": "testaddress"
    },
    ":state": {
      "S": "OR"
    },
    ":zip": {
      "S": "zip"
    },
    ":inc": {
      "N": "1"
    }
  }
}               
From the articles perspective, specifically focus on "ADD version :inc", which is an expression that tells AWS DynamoDB to increment the value of version by ":inc" value, which is provided separately using "ExpressionAttributeValues" with "1". Dealing with raw API in its json form is daunting, that is where the Software Development Kit(SDK) that AWS provides comes in, AWS SDK for Java 2 is a rewrite of AWS SDK's with a focus on using the latest Java features and Non-Blocking IO over the wire. Using AWS SDK for Java 2, an "UpdateItem" looks like this(using Kotlin code):
val updateItemRequest = UpdateItemRequest.builder()
    .tableName(TABLE_NAME)
    .key(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build()
        )
    )
    .updateExpression(
    """
        SET #name=:name,
        #state=:state,
        address=:address,
        zip=:zip 
        ADD version :inc
    """
    )
    .conditionExpression("version = :version")
    .expressionAttributeValues(
        mapOf(
            ":${NAME}" to AttributeValue.builder().s(hotel.name).build(),
            ":${ZIP}" to AttributeValue.builder().s(hotel.zip).build(),
            ":${STATE}" to AttributeValue.builder().s(hotel.state).build(),
            ":${ADDRESS}" to AttributeValue.builder().s(hotel.address).build(),
            ":${VERSION}" to AttributeValue.builder().n(hotel.version.toString()).build(),
            ":inc" to AttributeValue.builder().n("1").build()
        )
    )
    .expressionAttributeNames(
        mapOf(
            "#name" to "name",
            "#state" to "state"
        )
    )
    .build()

val updateItem: CompletableFuture<UpdateItemResponse> = dynamoClient.updateItem(updateItemRequest)

return Mono.fromCompletionStage(updateItem)
    .flatMap {
        getHotel(hotel.id)
    }
The highlighted line has the "Update Expression" with all the existing fields set to a new value and the version attribute incremented by 1. Another thing to note about this call is the "conditionExpression", which is essentially a way to tell DynamoDB to update the attributes if a condition matches up, in this specific instance if the existing value of version matches up. This provides a neat way to support optimistic locking on the record.

Conclusion

A lot of details here - the easiest way to get a feel for it is by trying out the code which is available in my github repository here - https://github.com/bijukunjummen/boot-with-dynamodb. The readme has good details on how to run it in a local environment. 

AWS DynamoDB provides a neat way to manage a version field on entities, ensuring that they are atomically updated and provides a way for them to used for optimistic locking

Thursday, April 9, 2020

Processing SQS Messages using Spring Boot and Project Reactor - Part 2

This is a follow up to my blog post about processing SQS messages efficiently using Spring Boot and Project Reactor

There are a few gaps in the approach that I have listed in the first part.

1. Handling failures in SQS Client calls
2. The approach would process only 1 message from SQS at a time, how can it be parallelized
3. It does not handle errors, any error in the pipeline would break the entire process and stop reading newer messages from the queue.


Recap

Just to recap, the previous post demonstrates creating a pipeline to process messages from an AWS SQS Queue using the excellent Project Reactor


The end result of that exercise was a pipeline which looks like this:




Given this pipeline, let me now go over how to bridge the gaps:

Handling SQS Client Failures


This is the function that generates the stream of messages read from SQS.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())

Now consider a case where the "sqsClient" above has a connectivity issue, the behavior with Flux is that in case of an error the stream is terminated. This, of course, will not do for a service whose job is to process messages as long the service is running.

The fix is to simply retry the processing flow in case of errors.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()

This would result in Flux re-establishing the stream of messages in case of any errors up to this point.

Processing Messages in Parallel

Project Reactor provides a few ways of parallelizing a processing pipeline. My first attempt at processing in parallel was to add a "subscribeOn" method to the processing chain.

Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .subscribeOn(Schedulers.newElastic("sub"))

However, this is not quite how "subscribeOn" works. An output when I send a few messages to this pipeline is the following:

2020-04-07 20:52:53.241  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.434  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.493  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.538  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.609  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-07 20:52:53.700  INFO 1137 --- [          sub-3] sample.msg.MessageListenerRunner         : Processed Message hello

The "sub-3" above is the name of the thread processing the message, and it looks like all the messages are getting processed on the "sub-3" thread and on no other threads!

subscribeOn simply changes the execution context by borrowing "a thread" from this scheduler pool and does not use all the threads in the pool itself.

So how can the processing be parallelized? This StackOverflow answer provides a very good approach that I am using here, essentially to use a flatMap operator and adding the "subscribeOn" operator inside the "flatMap" operator.

This operator eagerly subscribes to its inner publishers and then flattens the result, the trick is that the inner subscribers can be provided their own schedulers and for each subscription will end up using a thread from the scheduler pool. The number of these concurrent subscribers can be controlled using a "concurrency" parameter passed to the flatMap operator.


Flux.generate { sink: SynchronousSink<List<Message>> ->
    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .maxNumberOfMessages(5)
        .waitTimeSeconds(10)
        .build()

    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
    sink.next(messages)
}
    .flatMapIterable(Function.identity())
    .retry()
    .flatMap({ (message: String, deleteHandle: () -> Unit) ->
        task(message)
            .then(Mono.fromSupplier { Try.of { deleteHandle() } })
            .then()
            .subscribeOn(taskScheduler)
    }, concurrency)

and an output when processing multiple messages looks like this -

2020-04-08 21:03:24.582  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.815  INFO 17541 --- [  taskHandler-4] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-5] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-6] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.816  INFO 17541 --- [  taskHandler-7] sample.msg.MessageListenerRunner         : Processed Message hello
2020-04-08 21:03:24.817  INFO 17541 --- [  taskHandler-8] sample.msg.MessageListenerRunner         : Processed Message hello
see how there are more than thread name (taskHandler-*) in the logs now!


Handling downstream errors

One of my previous fixes with "retry" operator was about handling upstream errors with sqsClient connectivity. However, it is possible that as messages are being processed in the pipeline and any of the steps throw an error then the entire pipeline would fail. So it is important to guard EVERY step against failure. A neat way that I have been ensuring that errors don't propagate out is to use the excellent vavr library and its "Try" type. Try type holds two outcomes - a successful one(Success) or an Exception(Failure). This allows the rest of the pipeline to act on the outcome of the previous step in a measured way:

.flatMap({ (message: String, deleteHandle: () -> Unit) ->
    task(message)
        .then(Mono.fromSupplier { Try.of { deleteHandle() } })
        .doOnNext { t ->
            t.onFailure { e -> LOGGER.error(e.message, e) }
        }
        .then()
        .subscribeOn(taskScheduler)
}, concurrency)

The above snippet demonstrates an approach where I know that "deleteHandle" which is responsible for deleting a message can throw an exception, Try captures this and if there is an error logs it and this way the exception does not short circuit the flow of messages.



Conclusion

My initial thinking was that just because I have taken a reactive approach to process messages I would get huge boost in my sqs message processing pipeline, however, my learning has been that just like everything else it requires careful understanding and tuning for a Project reactor based stream to process messages efficiently. I am sure there a few more lessons for me to learn and I will be documenting those as I do.

This entire sample is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs

Sunday, March 22, 2020

Processing SQS Messages using Spring Boot and Project Reactor

I recently worked on a project where I had to efficiently process a large number of messages streaming in through an AWS SQS Queue. In this post (and potentially one more), I will go over the approach that I took to process the messages using the excellent Project Reactor



The following is the kind of set-up that I am aiming for:



Setting up a local AWS Environment

Before I jump into the code, let me get some preliminaries out of the way. First, how do you get a local version of SNS and SQS. One of the easiest ways is to use localstack. I use a docker-compose version of it described here

The second utility that I will be using is the AWS CLI. This website has details on how to install it locally.

Once both of these utilities are in place, a quick test should validate the setup:

# Create a queue
aws --endpoint http://localhost:4576 sqs create-queue --queue-name test-queue

# Send a sample message
aws --endpoint http://localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"

# Receive the message
aws --endpoint http://localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue


Basics of Project Reactor


Project Reactor implements the Reactive Streams specification and provides a way of handling streams of data across asynchronous boundaries that respects backpressure. A lot of words here but in essence think of it this way:
1. SQS Produces data
2. The application is going to consume and process it as a stream of data
3. The application should consume data at a pace that is sustainable - too much data should not be pumped in. This is formally referred to as "Backpressure"



AWS SDK 2

The library that I will be using to consume AWS SQS data is the AWS SDK 2. The library uses non-blocking IO under the covers.

The library offers both a sync version of making calls as well as an async version. Consider the synchronous way to fetch records from an SQS queue:

import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.SqsClient

val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()

Here "software.amazon.awssdk.services.sqs.SqsClient" is being used for querying sqs and retrieving a batch of results synchronously. An async result, on the other hand, looks like this:



val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }


The output now is now a "CompletableFuture"

Infinite loop and no backpressure

My first attempt at creating a stream(Flux) of message is fairly simple - an infinite loop that polls AWS sqs and creates a Flux from it using the "Flux.create" operator, this way:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()

                    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
                    LOGGER.info("Received: $messages")
                    sink.next(messages)
                } catch (e: InterruptedException) {
                    LOGGER.error(e.message, e)
                } catch (e: Exception) {
                    LOGGER.error(e.message, e)
                }
            }
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that there is an infinite loop that checks for new messages using long-polling. Messages may not be available at every poll, in which case an empty list is added to the stream.

This list of atmost 5 messages is then mapped to a stream of individual messages using the "flatMapIterable" operator, which is further mapped by extracting the message from the SNS wrapper (as message gets forwarded from SNS to SQS, SNS adds a wrapper to the message) and a way to delete the message(deleteHandle) once the message is successfully processed is returned as Pair.


This approach works perfectly fine... but imagine a case where a huge number of messages have come in, since the loop is not really aware of the throughput downstream it will keep pumping data to the stream. The default behavior is for the intermediate operators to buffer this data flowing in based on how the final consumer is consuming the data. Since this buffer is unbounded it is possible that the system may reach an unsustainable state.


Backpressure aware stream

The fix is to use a different operator to generate the stream of data - Flux.generate.
Using this operator the code looks like this:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.generate { sink: SynchronousSink<List<Message>> ->
            val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(5)
                .waitTimeSeconds(10)
                .build()

            val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
            LOGGER.info("Received: $messages")
            sink.next(messages)
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that the block passed to the "Flux.generate" operator is repeatedly called - similar to the while loop, in each loop one item is expected to be added to the stream. In this instance, the item added to the stream happens to be a list which like before is broken down into individual messages.

How does backpressure work in this scenario -
So again consider the case where the downstream consumer is processing at a slower rate than the generating end. In this case, Flux itself would slow down at the rate at which the generate operator is called, thus being considerate of the throughput of the downstream system.

Conclusion

This should set up a good pipeline for processing messages from SQS, there are a few more nuances to this to process messages in parallel later in the stream which I will cover in a future post.

The codebase of this example is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs. The code has a complete pipeline which includes processing the message and deleting it once processed.

Tuesday, February 25, 2020

Project Reactor expand method

One of my colleagues at work recently introduced me to the expand operator of the Project Reactor types and in this post I want to cover a few ways in which I have used it.


Unrolling a Paginated Result

Consider a Spring Data based repository on a model called City:

import org.springframework.data.jpa.repository.JpaRepository;
import samples.geo.domain.City;

public interface CityRepo extends JpaRepository<City, Long> {
}
This repository provides a way to retrieve the paginated result, along the following lines:

cityRepo.findAll(PageRequest.of(0, 5))

Now, if I were to unroll multiple pages into a result, the way to do it would be the following kind of a loop:

var pageable: Pageable = PageRequest.of(0, 5)
do {
    var page: Page<City> = cityRepo.findAll(pageable)
    page.content.forEach { city -> LOGGER.info("City $city") }
    pageable = page.nextPageable()
} while (page.hasNext())


An equivalent unroll of a paginated result can be done using the Reactor expand operator the following way:

val result: Flux<City> =
    Mono
        .fromSupplier { cityRepo.findAll(PageRequest.of(0, 5)) }
        .expand { page ->
            if (page.hasNext())
                Mono.fromSupplier { cityRepo.findAll(page.nextPageable()) }
            else
                Mono.empty()
        }
        .flatMap { page -> Flux.fromIterable(page.content) }

result.subscribe(
    { page -> LOGGER.info("City ${page}") },
    { t -> t.printStackTrace() }
)


Here the first page of results expands to the second page, the second page to the third page and so on until there are no pages to retrieve.

Traversing a Tree


Consider a node in a tree structure represented by the following model:

data class Node(
    val id: String,
    val nodeRefs: List<String>,
)

A sample data which looks like this:


can be traversed using a call which looks like this:

val rootMono: Mono<Node> = nodeService.getNode("1")
val expanded: Flux<Node> = rootMono.expand { node ->
    Flux.fromIterable(node.childRefs)
        .flatMap { nodeRef -> nodeService.getNode(nodeRef) }
}
expanded.subscribe { node -> println(node) }

This is a breadth-first expansion, the output looks like this:

Node-1
Node-1-1
Node-1-2
Node-1-1-1
Node-1-1-2
Node-1-2-1
Node-1-2-2

An expandDeep variation would traverse it depth-first

Monday, January 20, 2020

Spring WebClient and Java date-time fields

WebClient is Spring Framework's reactive client for making service to service calls.

WebClient has become a go to utility for me, however I unexpectedly encountered an issue recently in the way it handles Java 8 time fields that tripped me up and this post goes into the details.


Happy Path

First the happy path. When using a WebClient, Spring Boot advices a "WebClient.Builder" to be injected into a class instead of the "WebClient" itself and a WebClient.Builder is already auto-configured and available for injection.


Consider a fictitious "City" domain and a client to create a "City". "City" has a simple structure, note that the creationDate is a Java8 "Instant" type:

import java.time.Instant

data class City(
    val id: Long,
    val name: String,
    val country: String,
    val pop: Long,
    val creationDate: Instant = Instant.now()
)

The client to create an instance of this type looks like this:

class CitiesClient(
    private val webClientBuilder: WebClient.Builder,
    private val citiesBaseUrl: String
) {
    fun createCity(city: City): Mono<City> {
        val uri: URI = UriComponentsBuilder
            .fromUriString(citiesBaseUrl)
            .path("/cities")
            .build()
            .encode()
            .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .bodyValue(city)
            .exchange()
            .flatMap { clientResponse ->
                clientResponse.bodyToMono(City::class.java)
            }
    }
}

See how the intent is expressed in a fluent way. The uri and the headers are first being set, the request body is then put in place and the response is unmarshalled back to a "City" response type.


All well and good. Now how does a test look like.

I am using the excellent Wiremock to bring up a dummy remote service and using this CitiesClient to send the request, along these lines:

@SpringBootTest 
@AutoConfigureJson
class WebClientConfigurationTest {

    @Autowired
    private lateinit var webClientBuilder: WebClient.Builder

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @Test
    fun testAPost() {
        val dateAsString = "1985-02-01T10:10:10Z"

        val city = City(
            id = 1L, name = "some city",
            country = "some country",
            pop = 1000L,
            creationDate = Instant.parse(dateAsString)
        )
        WIREMOCK_SERVER.stubFor(
            post(urlMatching("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .withHeader("Content-Type", equalTo("application/json"))
                .willReturn(
                    aResponse()
                        .withHeader("Content-Type", "application/json")
                        .withStatus(HttpStatus.CREATED.value())
                        .withBody(objectMapper.writeValueAsString(city))
                )
        )

        val citiesClient = CitiesClient(webClientBuilder, "http://localhost:${WIREMOCK_SERVER.port()}")

        val citiesMono: Mono<City> = citiesClient.createCity(city)

        StepVerifier
            .create(citiesMono)
            .expectNext(city)
            .expectComplete()
            .verify()


        //Ensure that date field is in ISO-8601 format..
        WIREMOCK_SERVER.verify(
            postRequestedFor(urlPathMatching("/cities"))
                .withRequestBody(matchingJsonPath("$.creationDate", equalTo(dateAsString)))
        )
    }

    companion object {
        private val WIREMOCK_SERVER =
            WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().notifier(ConsoleNotifier(true)))

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

In the highlighted lines, I want to make sure that the remote service receives the date in ISO-8601 format as "1985-02-01T10:10:10Z". In this instance everything works cleanly and the test passes.

Not so happy path

Consider now a case where I have customized the WebClient.Builder in some form. An example is say I am using a registry service and I want to look up a remote service via this registry and then make a call then the WebClient has to be customized to add a "@LoadBalanced" annotation on it - some details here

So say, I have customized WebClient.Builder this way:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().filter { req, next ->
            LOGGER.error("Custom filter invoked..")
            next.exchange(req)
        }
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

It looks straightforward, however now the previous test fails. Specifically the date format of the creationDate over the wire is not ISO-8601 anymore, the raw request looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": 476100610.000000000
}

vs for a working request:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

See how the date format is different.

Problem

The underlying reason for this issue is simple, Spring Boot adds a bunch of configuration on WebClient.Builder that is lost when I have explicitly created the bean myself. Specifically in this instance there is a Jackson ObjectMapper created under the covers which by default writes dates as timestamps - some details here.

Solution

Okay, so how do we get back the customizations that Spring Boot makes. I have essentially replicated the behavior of a auto-configuration in Spring called "WebClientAutoConfiguration" and it looks like this:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(customizerProvider: ObjectProvider<WebClientCustomizer>): WebClient.Builder {
        val webClientBuilder: WebClient.Builder = WebClient
            .builder()
            .filter { req, next ->
                LOGGER.error("Custom filter invoked..")
                next.exchange(req)
            }

        customizerProvider.orderedStream()
            .forEach { customizer -> customizer.customize(webClientBuilder) }

        return webClientBuilder;
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

There is a likely a better approach than just replicating this behavior, but this approach works for me.

The posted content now looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

with the date back in the right format.

Conclusion

Spring Boot's auto-configurations for WebClient provides a opinionated set of defaults. If for any reason the WebClient and it's builder need to be configured explicitly then be wary of some of the customizations that Spring Boot adds and replicate it for the customized bean. In my case, the Jackson customization for Java 8 dates was missing in my custom "WebClient.Builder" and had to be explicitly accounted for.

A sample test and a customization is available here