Sunday, March 22, 2020

Processing SQS Messages using Spring Boot and Project Reactor

I recently worked on a project where I had to efficiently process a large number of messages streaming in through an AWS SQS Queue. In this post (and potentially one more), I will go over the approach that I took to process the messages using the excellent Project Reactor



The following is the kind of set-up that I am aiming for:



Setting up a local AWS Environment

Before I jump into the code, let me get some preliminaries out of the way. First, how do you get a local version of SNS and SQS. One of the easiest ways is to use localstack. I use a docker-compose version of it described here

The second utility that I will be using is the AWS CLI. This website has details on how to install it locally.

Once both of these utilities are in place, a quick test should validate the setup:

# Create a queue
aws --endpoint http://localhost:4576 sqs create-queue --queue-name test-queue

# Send a sample message
aws --endpoint http://localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"

# Receive the message
aws --endpoint http://localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue


Basics of Project Reactor


Project Reactor implements the Reactive Streams specification and provides a way of handling streams of data across asynchronous boundaries that respects backpressure. A lot of words here but in essence think of it this way:
1. SQS Produces data
2. The application is going to consume and process it as a stream of data
3. The application should consume data at a pace that is sustainable - too much data should not be pumped in. This is formally referred to as "Backpressure"



AWS SDK 2

The library that I will be using to consume AWS SQS data is the AWS SDK 2. The library uses non-blocking IO under the covers.

The library offers both a sync version of making calls as well as an async version. Consider the synchronous way to fetch records from an SQS queue:

import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.SqsClient

val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()

Here "software.amazon.awssdk.services.sqs.SqsClient" is being used for querying sqs and retrieving a batch of results synchronously. An async result, on the other hand, looks like this:



val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(5)
    .waitTimeSeconds(10)
    .build()

val messages: CompletableFuture<List<Message>> = sqsAsyncClient
    .receiveMessage(receiveMessageRequest)
    .thenApply { result -> result.messages() }


The output now is now a "CompletableFuture"

Infinite loop and no backpressure

My first attempt at creating a stream(Flux) of message is fairly simple - an infinite loop that polls AWS sqs and creates a Flux from it using the "Flux.create" operator, this way:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.create { sink: FluxSink<List<Message>> ->
            while (running) {
                try {
                    val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .maxNumberOfMessages(5)
                        .waitTimeSeconds(10)
                        .build()

                    val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
                    LOGGER.info("Received: $messages")
                    sink.next(messages)
                } catch (e: InterruptedException) {
                    LOGGER.error(e.message, e)
                } catch (e: Exception) {
                    LOGGER.error(e.message, e)
                }
            }
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that there is an infinite loop that checks for new messages using long-polling. Messages may not be available at every poll, in which case an empty list is added to the stream.

This list of atmost 5 messages is then mapped to a stream of individual messages using the "flatMapIterable" operator, which is further mapped by extracting the message from the SNS wrapper (as message gets forwarded from SNS to SQS, SNS adds a wrapper to the message) and a way to delete the message(deleteHandle) once the message is successfully processed is returned as Pair.


This approach works perfectly fine... but imagine a case where a huge number of messages have come in, since the loop is not really aware of the throughput downstream it will keep pumping data to the stream. The default behavior is for the intermediate operators to buffer this data flowing in based on how the final consumer is consuming the data. Since this buffer is unbounded it is possible that the system may reach an unsustainable state.


Backpressure aware stream

The fix is to use a different operator to generate the stream of data - Flux.generate.
Using this operator the code looks like this:

fun listen(): Flux<Pair<String, () -> Unit>> {
    return Flux.generate { sink: SynchronousSink<List<Message>> ->
            val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(5)
                .waitTimeSeconds(10)
                .build()

            val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
            LOGGER.info("Received: $messages")
            sink.next(messages)
        }
        .flatMapIterable(Function.identity())
        .doOnError { t: Throwable -> LOGGER.error(t.message, t) }
        .retry()
        .map { snsMessage: Message ->
            val snsMessageBody: String = snsMessage.body()
            val snsNotification: SnsNotification = readSnsNotification(snsMessageBody)
            snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) }
        }
}

The way this works is that the block passed to the "Flux.generate" operator is repeatedly called - similar to the while loop, in each loop one item is expected to be added to the stream. In this instance, the item added to the stream happens to be a list which like before is broken down into individual messages.

How does backpressure work in this scenario -
So again consider the case where the downstream consumer is processing at a slower rate than the generating end. In this case, Flux itself would slow down at the rate at which the generate operator is called, thus being considerate of the throughput of the downstream system.

Conclusion

This should set up a good pipeline for processing messages from SQS, there are a few more nuances to this to process messages in parallel later in the stream which I will cover in a future post.

The codebase of this example is available in my github repository here - https://github.com/bijukunjummen/boot-with-sns-sqs. The code has a complete pipeline which includes processing the message and deleting it once processed.

Tuesday, February 25, 2020

Project Reactor expand method

One of my colleagues at work recently introduced me to the expand operator of the Project Reactor types and in this post I want to cover a few ways in which I have used it.


Unrolling a Paginated Result

Consider a Spring Data based repository on a model called City:

import org.springframework.data.jpa.repository.JpaRepository;
import samples.geo.domain.City;

public interface CityRepo extends JpaRepository<City, Long> {
}
This repository provides a way to retrieve the paginated result, along the following lines:

cityRepo.findAll(PageRequest.of(0, 5))

Now, if I were to unroll multiple pages into a result, the way to do it would be the following kind of a loop:

var pageable: Pageable = PageRequest.of(0, 5)
do {
    var page: Page<City> = cityRepo.findAll(pageable)
    page.content.forEach { city -> LOGGER.info("City $city") }
    pageable = page.nextPageable()
} while (page.hasNext())


An equivalent unroll of a paginated result can be done using the Reactor expand operator the following way:

val result: Flux<City> =
    Mono
        .fromSupplier { cityRepo.findAll(PageRequest.of(0, 5)) }
        .expand { page ->
            if (page.hasNext())
                Mono.fromSupplier { cityRepo.findAll(page.nextPageable()) }
            else
                Mono.empty()
        }
        .flatMap { page -> Flux.fromIterable(page.content) }

result.subscribe(
    { page -> LOGGER.info("City ${page}") },
    { t -> t.printStackTrace() }
)


Here the first page of results expands to the second page, the second page to the third page and so on until there are no pages to retrieve.

Traversing a Tree


Consider a node in a tree structure represented by the following model:

data class Node(
    val id: String,
    val nodeRefs: List<String>,
)

A sample data which looks like this:


can be traversed using a call which looks like this:

val rootMono: Mono<Node> = nodeService.getNode("1")
val expanded: Flux<Node> = rootMono.expand { node ->
    Flux.fromIterable(node.childRefs)
        .flatMap { nodeRef -> nodeService.getNode(nodeRef) }
}
expanded.subscribe { node -> println(node) }

This is a breadth-first expansion, the output looks like this:

Node-1
Node-1-1
Node-1-2
Node-1-1-1
Node-1-1-2
Node-1-2-1
Node-1-2-2

An expandDeep variation would traverse it depth-first

Monday, January 20, 2020

Spring WebClient and Java date-time fields

WebClient is Spring Framework's reactive client for making service to service calls.

WebClient has become a go to utility for me, however I unexpectedly encountered an issue recently in the way it handles Java 8 time fields that tripped me up and this post goes into the details.


Happy Path

First the happy path. When using a WebClient, Spring Boot advices a "WebClient.Builder" to be injected into a class instead of the "WebClient" itself and a WebClient.Builder is already auto-configured and available for injection.


Consider a fictitious "City" domain and a client to create a "City". "City" has a simple structure, note that the creationDate is a Java8 "Instant" type:

import java.time.Instant

data class City(
    val id: Long,
    val name: String,
    val country: String,
    val pop: Long,
    val creationDate: Instant = Instant.now()
)

The client to create an instance of this type looks like this:

class CitiesClient(
    private val webClientBuilder: WebClient.Builder,
    private val citiesBaseUrl: String
) {
    fun createCity(city: City): Mono<City> {
        val uri: URI = UriComponentsBuilder
            .fromUriString(citiesBaseUrl)
            .path("/cities")
            .build()
            .encode()
            .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .bodyValue(city)
            .exchange()
            .flatMap { clientResponse ->
                clientResponse.bodyToMono(City::class.java)
            }
    }
}

See how the intent is expressed in a fluent way. The uri and the headers are first being set, the request body is then put in place and the response is unmarshalled back to a "City" response type.


All well and good. Now how does a test look like.

I am using the excellent Wiremock to bring up a dummy remote service and using this CitiesClient to send the request, along these lines:

@SpringBootTest 
@AutoConfigureJson
class WebClientConfigurationTest {

    @Autowired
    private lateinit var webClientBuilder: WebClient.Builder

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @Test
    fun testAPost() {
        val dateAsString = "1985-02-01T10:10:10Z"

        val city = City(
            id = 1L, name = "some city",
            country = "some country",
            pop = 1000L,
            creationDate = Instant.parse(dateAsString)
        )
        WIREMOCK_SERVER.stubFor(
            post(urlMatching("/cities"))
                .withHeader("Accept", equalTo("application/json"))
                .withHeader("Content-Type", equalTo("application/json"))
                .willReturn(
                    aResponse()
                        .withHeader("Content-Type", "application/json")
                        .withStatus(HttpStatus.CREATED.value())
                        .withBody(objectMapper.writeValueAsString(city))
                )
        )

        val citiesClient = CitiesClient(webClientBuilder, "http://localhost:${WIREMOCK_SERVER.port()}")

        val citiesMono: Mono<City> = citiesClient.createCity(city)

        StepVerifier
            .create(citiesMono)
            .expectNext(city)
            .expectComplete()
            .verify()


        //Ensure that date field is in ISO-8601 format..
        WIREMOCK_SERVER.verify(
            postRequestedFor(urlPathMatching("/cities"))
                .withRequestBody(matchingJsonPath("$.creationDate", equalTo(dateAsString)))
        )
    }

    companion object {
        private val WIREMOCK_SERVER =
            WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().notifier(ConsoleNotifier(true)))

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            WIREMOCK_SERVER.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            WIREMOCK_SERVER.stop()
        }
    }
}

In the highlighted lines, I want to make sure that the remote service receives the date in ISO-8601 format as "1985-02-01T10:10:10Z". In this instance everything works cleanly and the test passes.

Not so happy path

Consider now a case where I have customized the WebClient.Builder in some form. An example is say I am using a registry service and I want to look up a remote service via this registry and then make a call then the WebClient has to be customized to add a "@LoadBalanced" annotation on it - some details here

So say, I have customized WebClient.Builder this way:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder().filter { req, next ->
            LOGGER.error("Custom filter invoked..")
            next.exchange(req)
        }
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

It looks straightforward, however now the previous test fails. Specifically the date format of the creationDate over the wire is not ISO-8601 anymore, the raw request looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": 476100610.000000000
}

vs for a working request:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

See how the date format is different.

Problem

The underlying reason for this issue is simple, Spring Boot adds a bunch of configuration on WebClient.Builder that is lost when I have explicitly created the bean myself. Specifically in this instance there is a Jackson ObjectMapper created under the covers which by default writes dates as timestamps - some details here.

Solution

Okay, so how do we get back the customizations that Spring Boot makes. I have essentially replicated the behavior of a auto-configuration in Spring called "WebClientAutoConfiguration" and it looks like this:

@Configuration
class WebClientConfiguration {

    @Bean
    fun webClientBuilder(customizerProvider: ObjectProvider<WebClientCustomizer>): WebClient.Builder {
        val webClientBuilder: WebClient.Builder = WebClient
            .builder()
            .filter { req, next ->
                LOGGER.error("Custom filter invoked..")
                next.exchange(req)
            }

        customizerProvider.orderedStream()
            .forEach { customizer -> customizer.customize(webClientBuilder) }

        return webClientBuilder;
    }

    companion object {
        val LOGGER = loggerFor<WebClientConfiguration>()
    }
}

There is a likely a better approach than just replicating this behavior, but this approach works for me.

The posted content now looks like this:

{
    "id": 1,
    "name": "some city",
    "country": "some country",
    "pop": 1000,
    "creationDate": "1985-02-01T10:10:10Z"
}

with the date back in the right format.

Conclusion

Spring Boot's auto-configurations for WebClient provides a opinionated set of defaults. If for any reason the WebClient and it's builder need to be configured explicitly then be wary of some of the customizations that Spring Boot adds and replicate it for the customized bean. In my case, the Jackson customization for Java 8 dates was missing in my custom "WebClient.Builder" and had to be explicitly accounted for.

A sample test and a customization is available here