Monday, January 18, 2021

Generating a stream of Fibonacci numbers

A Java stream represents potentially an infinite sequence of data. This is a simple post that will go into the mechanics involved in generating a simple stream of Fibonacci numbers.


The simplest way to get this stream of data is to use the generate method of Stream.

As you can imagine to generate a specific Fibonacci number in this sequence, the previous two numbers are required, which means the state of the previous two numbers need to be maintained somewhere. The two solutions that I will be describing here both maintain this state, however they do it differently.



Mutable State

In the first approach, I am just going to maintain state this way:


class FibState {
    private long[] prev = new long[2];
    private int index = 0;

    public long nextFib() {
        long result = (index == 0) ? 1
                : ((index == 1) ? 1 : prev[0] + prev[1]);
        prev[0] = prev[1];
        prev[1] = result;
        index++;
        return result;
    }
}


with index keeping track of the index of the current fibonacci number in the sequence and prev capturing the most recent two in the sequence. So the next in the series is generated by mutating the index and the changing the array holding the recent values. So given this state, how do we generate the stream, using code which looks like this:

   
Stream<Long> streamOfFib() {
    FibState fibState = new FibState();
    return Stream.generate(() -> fibState.nextFib());
}

This is using a closure to capture the fibState and mutating it repeatedly as the stream of numbers is generated. The approach works well, though the thought of mutating one value probably should induce a level of dread - is it thread safe (probably not), will it work for parallel streams(likely not), but should suffice for cases where the access is strictly sequential. A far better approach is to get a version of state that is immutable.

Immutable State


class FibState {
    private final long[] prev;
    private final int index;

    public FibState() {
        this(new long[]{-1, 1}, 0);
    }

    public FibState(long[] prev, int index) {
        this.prev = prev;
        this.index = index;
    }

    public FibState nextFib() {
        int nextIndex = index + 1;
        long result = (nextIndex == 1) ? 1 : prev[0] + prev[1];
        return new FibState(new long[]{prev[1], result}, nextIndex);
    }

    public long getValue() {
        return prev[1];
    }
}
Instead of mutating the state, it returns the next immutable state. Alright, so now that this version of state is available, how can it be used - by using the "iterate" function of Stream, like this:
Stream<Long> streamOfFib() {
    return Stream
            .iterate(new FibState(), fibState -> fibState.nextFib())
            .map(fibState -> fibState.getValue());
}

this function takes two parameters - the initial state and something which can generate the next state. Also to return numbers from it, I am mapping this "state" type to a number in the "map" operation.


Conclusion

This is generally instructive on how to generate a stream using Fibonacci sequence as an example. The approach will be fairly similar for any stream that you may need to generate. My personal preference is for the Immutable state variation as that delegates most of the mechanism of generating the stream to the excellent "iterate" helper function of Stream. 

Monday, November 30, 2020

AWS SDK 2 for Java and storing a Json in DynamoDB

 AWS DynamoDB is described as a NoSQL key-value and a document database. In my work I mostly use the key-value behavior of the database but rarely use the document database features, however  the document database part is growing on me and this post highlights some ways of using the document database feature of DynamoDB along with introducing a small utility library built on top of AWS SDK 2.X for Java that simplifies using document database features of AWS DynamoDB

The treatment of the document database features will be very high level in this post, I will plan a follow up which goes into more details later


DynamoDB as a document database

So what does it mean for AWS DynamoDB to be treated as a document database. Consider a json representation of an entity, say something representing a Hotel:

{
    "id": "1",
    "name": "test",
    "address": "test address",
    "state": "OR",
    "properties": {
        "amenities":{
            "rooms": 100,
            "gym": 2,
            "swimmingPool": true                    
        }                    
    },
    "zip": "zip"
}
This json has some top level attributes like "id", a name, an address etc. But it also has a free form "properties" holding some additional "nested" attributes of this hotel. 

A document database can store this document representing the hotel in its entirety OR can treat individual fields say the "properties" field of the hotel as a document. 
A naive way to do this will be to simply serialize the entire content into a json string and store it in, say for eg, for the properties field transform into a string representation of the json and store in the database, this works, but there are a few issues with it. 
  1. None of the attributes of the field like properties can be queried for, say if I wanted to know whether the hotel has a swimming pool, there is no way just to get this information of of the stored content. 
  2. The attributes cannot be filtered on - so say if wanted hotels with atleast 2 gyms, this is not something that can be filtered down to. 

A document database would allow for the the entire document to be saved, individual attributes, both top level and nested ones, to be queried/filtered on. 
So for eg, in the example of "hotel" document the top level attributes are "id", "name", "address", "state", "zip" and the nested attributes are "properties.amenities.rooms", "properties.amenities.gym", "properties.amenities.swimmingPool" and so on.

AWS SDK 2 for DynamoDB and Document database support

If you are writing a Java based application to interact with a AWS DynamoDB database, then you would have likely used the new AWS SDK 2 library to make the API calls. However one issue with the library is that it natively does not support a json based document model. Let me go into a little more detail here.  

From the AWS SDK 2 for AWS DynamoDB's perspective every attribute that is saved is an instance of something called an AttributeValue
A row of data, say for a hotel, is a simple map of "attribute" names to Attribute values, and a sample code looks something like this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to objectMapper.writeValueAsString(hotel.properties),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
dynamoClient.putItem(putItemRequest)
Here a map of each attribute to an AttributeValue is being created with an appropriate "type" of content, "s" indicates a string, "n" a number in the above sample. 

There are other AttributeValue types like "m" representing a map and "l" representing a list. 

The neat thing is that "m" and "l" types can have nested AttributeValues, which maps to a structured json document, however there is no simple way to convert a json to this kind of an Attribute Value and back. 

So for eg. if I were to handle the raw "properties" of a hotel which understands the nested attributes, an approach could be this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to AttributeValue.builder()
                .m(
                    mapOf(
                        "amenities" to AttributeValue.builder()
                            .m(
                                mapOf(
                                    "rooms" to AttributeValue.builder().n("200").build(),
                                    "gym" to AttributeValue.builder().n("2").build(),
                                    "swimmingPool" to AttributeValue.builder().bool(true).build()
                                )
                            )
                            .build()
                    )
                )
                .build(),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
See how the nested attributes are being expanded out recursively. 
 

Introducing the Json to AttributeValue utility library

This is exactly where the utility library that I have developed comes in. 

Given a json structure as a Jackson JsonNode it converts the Json into an appropriately nested AttributeValue type and when retrieving back from DynamoDB, can convert the resulting nested AttributeValue type back to a json. 

The structure would look exactly similar to the handcrafted sample shown before. So using the utility saving the "properties" would look like this:
val putItemRequest = PutItemRequest.builder()
    .tableName(TABLE_NAME)
    .item(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build(),
            NAME to AttributeValue.builder().s(hotel.name).build(),
            ZIP to AttributeValue.builder().s(hotel.zip).build(),
            STATE to AttributeValue.builder().s(hotel.state).build(),
            ADDRESS to AttributeValue.builder().s(hotel.address).build(),
            PROPERTIES to JsonAttributeValueUtil.toAttributeValue(hotel.properties),
            VERSION to AttributeValue.builder().n(hotel.version.toString()).build()
        )
    )
    .build()
dynamoClient.putItem(putItemRequest)
and when querying back from DynamoDB, the resulting nested AttributeValue converted back to a json this way(Kotlin code in case you are baffled by the "?let"):
properties = map[PROPERTIES]?.let { attributeValue ->
    JsonAttributeValueUtil.fromAttributeValue(
        attributeValue
    )
} ?: JsonNodeFactory.instance.objectNode()
The neat thing is even the top level attributes can be generated given a json representing the entire Hotel type. So say a json representing a Hotel is provided:
val hotel = """
    {
        "id": "1",
        "name": "test",
        "address": "test address",
        "state": "OR",
        "properties": {
            "amenities":{
                "rooms": 100,
                "gym": 2,
                "swimmingPool": true                    
            }                    
        },
        "zip": "zip"
    }
""".trimIndent()
val attributeValue = JsonAttributeValueUtil.toAttributeValue(hotel, objectMapper)
dynamoDbClient.putItem(
    PutItemRequest.builder()
            .tableName(DynamoHotelRepo.TABLE_NAME)
            .item(attributeValue.m())
            .build()
    )


Using the Library

The utility library is available here - https://github.com/bijukunjummen/aws-sdk2-dynamo-json-helper and provides details of how to get the binaries in place and use it with code.

 

Conclusion

AWS SDK 2 is an excellent and highly performant client, providing non-blocking support for client calls. I like how it provides a synchronous API and an asynchronous API and remains highly opionionated in consistenly providing a low level client API for calling the different AWS services. This utlility library provides a nice bridge for AWS SDK 2 to remain low level but be able to manage a json based document persistence and back. All the samples in this post are available in my github repository here - https://github.com/bijukunjummen/dynamodb-document-sample

Sunday, November 15, 2020

Permutation - Heap's Algorithm

 This is a little bit of an experimentation that I did recently to figure out a reasonable code to get all possible permutations of a set of characters. 


So say given a set of characters "ABC", my objective is to come up code which can spit out "ABC", "ACB", "BAC", "BCA", "CBA", "CAB". 


The approach I took is to go with the definition of permutation itself, so with "ABCD" as the set of characters a 4 slot that needs to be filled.



The first slot can be filled by any of A, B, C, D, in 4 ways:


The second slot by any of the remaining 3 characters, so with "A" in the first slot - 


The third slot by the remaining 2 characters, so with "A", "B" in the first two slots:
And finally, the fourth slot by the remaining 1 character, with say "A", "B", "C" in the first 3 slots:



In total, there would be 4 for the first slot * 3 for the 2nd slot * 2 for the 3rd slot * 1 for the 4th slot - 24 permutations altogether. 

I can do this in place, using an algorithm that looks like this:






A trace of flow and the swaps is here:
The only trick here is that the code does all the holding of characters and getting it into the right place in place by swapping the right characters to the right place and restoring it at the end of it. 

This works well for a reasonably sized set of characters - reasonable because for just 10 characters, there would be 3,628,800 permutations. 

An algorithm that works even better, though a complete mystery to me how it actually functions(well explained here if anybody is interested), is the Heap's Algorithm. Here is a java implementation of it: It very efficiently does one swap per permutation, which is still high but better than the approach that I have described before. 


In a sample perumutation of 8 characters, which generates 40320 permutations, the home cooked version swaps 80638 times, and the Heap's algorithm swaps 40319 times! thus proving its efficacy.

Tuesday, October 13, 2020

An experiment with Little's Law

My previous blog post about Project Reactor and Backpressure was about how Project Reactor provides sane defaults for scenarios where a lot more information is produced than a Consumer can consume. It does this by throttling the Producer such that in a steady-state the Producer's rate of production matches the Consumer's rate of consumption. This throttling is referred to as Backpressure.

For a stream of data, ultimately the Producer and the Consumer reach a steady state where the Producer is producing at a rate that the consumer is comfortable consuming.

So now in this stable system, how many items are in the system at any point in time. I decided to simply collect metrics for this information, by keeping a counter which is incremented and decremented as items are added into the system by the Producer and later consumed and removed by the Consumer.

Little's Law

This is a well-known problem however and I realized through more reading that this need not be measured but instead can be calculated using Little's Law defined the following way:



For my case, this maps to :

L - Number of Items in the system

λ - Consumption or Production Rate

W - Average Time spent in the System

Of course, it does require measuring the Consumption rate and the Average Time spent in the system though! The point is knowing any 2 of the values, the remaining value can be easily calculated.

Experiment

The exercise that I next performed was to compare the values that I measured in the system against the value calculated by Little's Law. No surprises here, the value measured by the system closely matches Little's law!

Let me consider a few of these scenarios here. To recap, my simple application consists of a Producer that can produce a sequence of numbers at a pre-defined rate. These sequences of numbers are consumed by a set of consumers.





Scenario 1: Unstable system with a high Producer rate

The first scenario that I considered is for a case where the Little's law actually is not supposed to work effectively, this is for an unstable system where the Producer and the Consumer produce and consume at a different rate. In my example the Producer produces a large amount of data (256 records at a time at the rate of 10 per second) and waits for the Consumer's to catch up at the rate of 4 per second) and then produces the next amount of data. You can imagine that a lot of data will be buffered in the system and the L value will be high.

A graph of the calculated(in Yellow) and measured L(in Green) value shows up the following way:



The L is around 150, so 150 records are in the system.  Although this is not a stable system,  the calculated L value matches the measured L value fairly well. 


Scenario 2: Stable System with similar Producer and Consumer rate

Little's law shines for a stable system. Consider the following scenario where the Producer and Consumer rate matches up. A graph now looks like this:


 

This time the measured L value lines up perfectly with the calculated value, thus proving the utility of Little's law.

Conclusion

There is nothing much to conclude here, Little's law is a proven law, the interest for me was in observing how well it pans out with an experiment. Personally, it has been satisfying to see a law line up against an experiment.

I have this entire set-up in a github repository here if you would like to replicate it.

Sunday, July 26, 2020

Backpressure in Project Reactor

Project Reactor implements the Reactive Streams specification, which is a standard for asynchronously processing a stream of data while respecting the processing capabilities of a consumer. 

At a very broad level, there are two entities involved, a Producer that produces the stream of data and a Consumer that consumes data. If the rate at which a Consumer consumes data is less than the rate at which a Producer produces data (referred to as a Fast Producer/Slow Consumer), then signals from the consumer can constrain the rate of production, this is referred to as Backpressure and in this post, I will be demonstrating a few backpressure examples using Project Reactor

Before I go ahead, I have to acknowledge that these examples are loosely based on what I learned from the "Reactive Programming with RxJava" book.

Producer

Flux in Project Reactor represents an asynchronous stream of 0..N data, where N can potentially be infinite. 

Consider a simple example, generating a sequence of numbers. There are built-in ways in Flux to do this, but for the example, I will be using an operator called Flux.generate. Sample code looks like this:
fun produce(targetRate: Int, upto: Long): Flux<Long> {
    val delayBetweenEmits: Long = 1000L / targetRate

    return Flux.generate(
        { 1L },
        { state: Long, sink: SynchronousSink<Long> ->
            sleep(delayBetweenEmits)
            val nextState: Long = state + 1
            if (state > upto) {
                sink.complete()
                nextState
            } else {
                LOGGER.info("Emitted {}", state)
                sink.next(state)
                nextState
            }
        }
    )
}

Here "targetRate" is the rate per second at which the Producer is expected to produce a sequence of numbers and "upto" represents the range for which the sequence is to be generated. "Thread.sleep" is used for introducing the delay between emissions.


Consumer

A consumer for this stream of data just consumes the sequence of numbers and to simulate processing while consuming the data, delays are again introduced just before reading the information, along these lines:
val delayBetweenConsumes: Long = 1000L / consumerRate
producer.produce(producerRate, count)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }

Just like with rate at the Producer side, there is a rate of consuming on the consumer side which drives the delay before consuming the data.


Scenario 1: Fast Producer, Slow Consumer without Threading

Now that I have a stream of data for which I can control the rate of production and rate of consumption, the first test that I ran was with the producer and the consumer chained together. 

The Producer produces at the rate of 100 requests a second and the consumer consuming it at 3 per second. 

If there were no backpressure mechanisms in place you would expect that Producer would merrily go along and produce all the records at its own pace of 100 per second and Consumer would slowly catch up at the rate of 3 per second.  This is NOT what happens though. 

The reason is not that intuitive I feel, it is not really backpressure coming into play either. The Producer is constrained to 3 requests per second merely because the entire flow from the Producer to the Consumer is synchronous by default and since the production and the consumption are happening on the same thread, the behavior is automatically constrained to what the Consumer is comfortable in consuming. 

Here is a graph which simply plots the rate of production and consumption over time and captures clearly the exact same rate of Production and Consumption throughout:


This behavior is borne out from the logs also, which show that the consumer and producer remain in sync:
2020-07-26 17:51:58.712  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 84
2020-07-26 17:51:59.048  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 84
2020-07-26 17:51:59.059  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 85
2020-07-26 17:51:59.393  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 85
2020-07-26 17:51:59.404  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 86
2020-07-26 17:51:59.740  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 86
2020-07-26 17:51:59.751  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 87
2020-07-26 17:52:00.084  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 87
2020-07-26 17:52:00.095  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 88
2020-07-26 17:52:00.430  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 88
2020-07-26 17:52:00.441  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 89
2020-07-26 17:52:00.777  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 89
2020-07-26 17:52:00.788  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 90
2020-07-26 17:52:01.087  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 90
2020-07-26 17:52:01.097  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 91
2020-07-26 17:52:01.432  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 91
2020-07-26 17:52:01.442  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 92
2020-07-26 17:52:01.777  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 92
2020-07-26 17:52:01.788  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 93
2020-07-26 17:52:02.123  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 93
2020-07-26 17:52:02.133  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 94
2020-07-26 17:52:02.467  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 94
2020-07-26 17:52:02.478  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 95
2020-07-26 17:52:02.813  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 95
2020-07-26 17:52:02.824  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 96
2020-07-26 17:52:03.157  INFO 1 --- [pool-1-thread-1] sample.meter.Consumer                    : Consumed 96
2020-07-26 17:52:03.168  INFO 1 --- [pool-1-thread-1] sample.meter.Producer                    : Emitted 97


Scenario 2: Fast Producer, Slow Consumer with Threading

The second scenario that I considered was with the Producer and the Consumer being produced independently in different threads. 

Project reactor makes this possible through two operators subscribeOn() which changes the thread where in my case the Producer produces the sequence and a publishOn() which shifts the consumption to a different thread. 

With these in place, the code looks like this:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
The results were a little surprising, this is what I saw in the logs:
...
2020-07-26 18:42:41.774  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 252
2020-07-26 18:42:41.786  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 253
2020-07-26 18:42:41.797  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 254
2020-07-26 18:42:41.809  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 255
2020-07-26 18:42:41.819  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 256
2020-07-26 18:42:42.019  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 9
2020-07-26 18:42:42.354  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 10
2020-07-26 18:42:42.689  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 11
2020-07-26 18:42:43.024  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 12
2020-07-26 18:42:43.358  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 13
2020-07-26 18:42:43.691  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 14
2020-07-26 18:42:44.027  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 15
2020-07-26 18:42:44.363  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 16
.....
2020-07-26 18:43:43.724  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 299
2020-07-26 18:43:43.735  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 300
2020-07-26 18:43:43.913  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 194
2020-07-26 18:43:44.248  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 195
2020-07-26 18:43:44.581  INFO 1 --- [      publish-2] sample.meter.Consumer                    : Consumed 196
...
A sequence of numbers upto 256 was produced immediately and then the Producer waited for the Consumer to catch up, once the consumer caught up, the remaining emissions happened. This is how the graph for this looks: 



Clearly, backpressure is acting on this stream of data. The surprising aspect for me was the backpressure appeared to be triggering at a large value of 256 records from upstream.

Analyzing this is a little, the reason I realized is that an intermediate operation is buffering the requests. The intermediate operation in this instance happens to be the "publishOn()" operator that I am using, a variant of "publishOn()" which additionally takes in a prefetch parameter fixes the size of the buffer. 

In my case setting it to 10 felt reasonable, the code looks like this now:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .subscribe { value: Long ->
        sleep(delayBetweenConsumes)
        logger.info("Consumed {}", value)
    }
and the graph with the Producer and Consumer remains closely in sync:

Producer In Green, Consumer in Red


Scenario 3: Fast Producer, Multi-threaded Consumer

If you look closely at the name of the threads in logs from the first two scenarios then you would notice that the names of the thread at the point of production and at the point of consumption are always the same. The operators "publishOn()" and "subscribeOn()" don't parallelize the operation, they only switch the execution context of the operations. To really parallelize the operations, two approaches can be taken: 

  1. Using the parallel operator 
  2. Using flatMap flavors with their own "subscribeOn" operators 

For the 3rd scenario, I went for the second option of using flatMap and it looks something like this:
producer.produce(producerRate, count)
    .subscribeOn(subscribeOnScheduler)
    .publishOn(publishOnScheduler, 10)
    .flatMap({ value: Long ->
        Mono.fromSupplier {
            sleep(delayBetweenConsumes)
            logger.info("Consumed {}", value)
            null
        }.subscribeOn(flatMapScheduler)
    }, concurrency)
    .subscribe()
The work of consuming the produced sequence of numbers is being done inside the flatMap operation, the number of concurrent consumption is set to 5 by default. Running this scenario produces the following logs, the consumers are now running 5 at a time on multiple threads:
2020-07-26 23:26:27.212  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 1
2020-07-26 23:26:27.321  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 2
2020-07-26 23:26:27.423  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 3
...
2020-07-26 23:26:28.040  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 9
2020-07-26 23:26:28.143  INFO 1 --- [    subscribe-3] sample.meter.Producer                    : Emitted 10
2020-07-26 23:26:28.222  INFO 1 --- [      flatMap-4] sample.meter.Consumer                    : Consumed 1
2020-07-26 23:26:28.328  INFO 1 --- [      flatMap-5] sample.meter.Consumer                    : Consumed 2
2020-07-26 23:26:28.428  INFO 1 --- [      flatMap-6] sample.meter.Consumer                    : Consumed 3
2020-07-26 23:26:28.527  INFO 1 --- [      flatMap-7] sample.meter.Consumer                    : Consumed 4
...
The rate of production lines up with the rate of consumption

Producer In Green/Consumer in Red


Conclusion

These are different experiments that I was able to run to simulate backpressure scenarios with Project Reactor and the behavior should be true for most Reactive Streams based libraries. 

Project Reactor has sane defaults in managing the backpressure needs of a Consumer and provides ways to override the defaults. 

In all scenarios that I have run in this post, the Producer throttled the production to a rate that the Consumer was comfortable in consuming. 

 If you are interested in exploring the scenarios further, my codebase along with the grafana/prometheus set up for graphing the output is available in my github repository here - https://github.com/bijukunjummen/backpressure-demo

Saturday, July 4, 2020

Expressing a conditional expression using Json - Java Implementation

I had a need recently to express a conditional expression in a form that a front end Javascript application and a backend Java application could both create and read. Expressing the conditional expression as a Json felt logical and after a quick search, JsonLogic library appeared to fit exactly what I was looking for.

JsonLogic follows a prefix notation for its expressions, along these lines:

{"operator" : ["values" ... ]}
So for eg, given a JSON input data that looks like this:
{
  "a": "hello",
  "b": 1,
  "c": [
    "elem1",
    "elem2"
  ]
}
For equality, an expression using JsonLogic is the following:
{"==" : [ { "var" : "a" }, "hello" ] }
Here the data is being looked up using "var" expression and the equality is checked using the "==" operator. 

Though it is a good fit, I decided to go with an alternate way of expressing the conditional expression but heavily inspired by JsonLogic. So, in my implementation, equality with the sample JSON looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Fairly similar, the location to the data is expressed as a Json Pointer and the operators are textual ("equals" vs "==") The full set of supported features are also much smaller than JsonLogic as that sufficed my needs for the project. So now I have a small Java-based library that supports these simplified conditional expressions and this post will go into the details of the operators and the usage of the library.



Sample Expressions

Once more to touch on the sample conditional expressions, everything takes the form of:
{"operator" : ["values" ... ]}
A check for equality looks like this:
{
    "equals": [
        "/a", "hello"
    ]
}
Not operator:
{
    "not": [
        {
            "equals": [
                "/a",
                "hello"
            ]
        }
    ]
}
And/Or operator:
{
    "and": [
        {
            "equal": [
                "/a", "hello"
            ]
        },
        {
            "equal": [
                "/b", 1
            ]
        }
    ]
}
There are a few operators that work on collections, for eg, to check if "c" in the sample JSON has elements "elem1", "elem2":
{
    "contains": [
        "/c", ["elem1", "elem2"]
    ]
}
or to check if the collection has any of the elements "elem1", "elem2":
{
    "containsAnyOf": [
        "/c", ["elem1", "elem2"]
    ]
}

Details of the Library

The Java-based library is built on top of the excellent Jackson JSON parser library and uses it to parse the expression which once parsed is interpreted by the library. A Gradle based project can pull in the dependency the following way(published currently to JCenter):
implementation 'com.github.bijukunjummen:json-conditional-expression:0.4.0'
and use the library along these lines, using a sample Kotlin code:
val jsonExpressionEvaluator: JsonExpressionEvaluator = JsonExpressionEvaluator(ObjectMapper())
jsonExpressionEvaluator.matches(expression, json) //returns true

Conclusion

The conditional expression and the corresponding Java-based interpreter are fairly simple and have sufficed my needs with the kind of operators that I needed for my project, however, I will be more than happy to extend and support more extensive operators if there is enough interest in using the library.


Reference

1. JsonLogic which provided the inspiration for using a prefix notation to represent a conditional expression

Monday, May 25, 2020

AWS DynamoDB version field using AWS SDK for Java 2

It is useful to have a version attribute on any entity saved to an AWS DynamoDB database which is simply a numeric indication of the number of times the entity has been modified. When the entity is first created it can be set to 1 and then incremented on every update. 

The benefit is immediate - an indicator of the number of times an entity has been modified which can be used for auditing the entity. Also, an additional use is for optimistic locking where an entity is allowed to be updated only if the holder updating it has the right version of the entity.


This post will go into details of how to introduce such a field with the DynamoDB related libraries of AWS SDK 2

Model

Consider a model called Hotel which is being persisted into a dynamo database. In Kotlin, it can be represented using the following data class:

data class Hotel(
    val id: String = UUID.randomUUID().toString(),
    val name: String,
    val address: String? = null,
    val state: String? = null,
    val zip: String? = null,
    val version: Long = 1L
)
A version field has been introduced in this model with an initial value of 1. The aim will be to save this field as-is and then let dynamo atomically manage the increment of this field at the point of saving this entity.

As the fields in this model gets changed, I would like the version to be updated along these lines:


 


Local version of DynamoDB

It is useful to have DynamoDB running on the local machine, this way not having to create real DynamoDB tables in AWS.

There are multiple ways of doing this. One is to use a docker version of DynamoDB Local, which can be started up the following way to listen on port 4569:


docker run -p 4569:8000 amazon/dynamodb-local:1.13
My personal preference is to use localstack and the instructions at the site have different ways to start it up. I normally use docker-compose to bring it up. One of the reasons to use localstack over DynamoDB Local is that localstack provides a comprehensive set of AWS services for local testing and not just DynamoDB.


Quick Demo

I have the entire code available in my github repo herehttps://github.com/bijukunjummen/boot-with-dynamodb

Once the application is brought up using the local version of dynamoDB, an entity can be created using the following httpie request:



http :9080/hotels id=4 name=name address=address zip=zip state=OR
With a response, where the version field is set to 1:
{
    "address": "address",
    "id": "4",
    "name": "name",
    "state": "OR",
    "version": 1,
    "zip": "zip"
}
Then if the name is updated for the entity:
http PUT :9080/hotels/4 name=name1 address=address zip=zip state=OR version=1
the version field gets updated to 2 and so on:
{
    "address": "address",
    "id": "4",
    "name": "name1",
    "state": "OR",
    "version": 2,
    "zip": "zip"
}
Also note that if during an update a wrong version number is provided, the call would fail as there is an optimistic locking in place using this version field.

Implementing the version field

Implementing the version field depends on the powerful UpdateItem API provided by DynamoDB. One of the features of UpdateItem API is that it takes in a "UpdateExpression" which is a dsl which shows how different Dynamo attributes should be updated.

The raw request to AWS DynamoDB looks like this:

 
{
  "TableName": "hotels",
  "Key": {
    "id": {
      "S": "1"
    }
  },
  "UpdateExpression": "\nSET #name=:name,\n #state=:state,\naddress=:address,\nzip=:zip\nADD version :inc\n ",
  "ExpressionAttributeNames": {
    "#state": "state",
    "#name": "name"
  },
  "ExpressionAttributeValues": {
    ":name": {
      "S": "testhotel"
    },
    ":address": {
      "S": "testaddress"
    },
    ":state": {
      "S": "OR"
    },
    ":zip": {
      "S": "zip"
    },
    ":inc": {
      "N": "1"
    }
  }
}               
From the articles perspective, specifically focus on "ADD version :inc", which is an expression that tells AWS DynamoDB to increment the value of version by ":inc" value, which is provided separately using "ExpressionAttributeValues" with "1". Dealing with raw API in its json form is daunting, that is where the Software Development Kit(SDK) that AWS provides comes in, AWS SDK for Java 2 is a rewrite of AWS SDK's with a focus on using the latest Java features and Non-Blocking IO over the wire. Using AWS SDK for Java 2, an "UpdateItem" looks like this(using Kotlin code):
val updateItemRequest = UpdateItemRequest.builder()
    .tableName(TABLE_NAME)
    .key(
        mapOf(
            ID to AttributeValue.builder().s(hotel.id).build()
        )
    )
    .updateExpression(
    """
        SET #name=:name,
        #state=:state,
        address=:address,
        zip=:zip 
        ADD version :inc
    """
    )
    .conditionExpression("version = :version")
    .expressionAttributeValues(
        mapOf(
            ":${NAME}" to AttributeValue.builder().s(hotel.name).build(),
            ":${ZIP}" to AttributeValue.builder().s(hotel.zip).build(),
            ":${STATE}" to AttributeValue.builder().s(hotel.state).build(),
            ":${ADDRESS}" to AttributeValue.builder().s(hotel.address).build(),
            ":${VERSION}" to AttributeValue.builder().n(hotel.version.toString()).build(),
            ":inc" to AttributeValue.builder().n("1").build()
        )
    )
    .expressionAttributeNames(
        mapOf(
            "#name" to "name",
            "#state" to "state"
        )
    )
    .build()

val updateItem: CompletableFuture<UpdateItemResponse> = dynamoClient.updateItem(updateItemRequest)

return Mono.fromCompletionStage(updateItem)
    .flatMap {
        getHotel(hotel.id)
    }
The highlighted line has the "Update Expression" with all the existing fields set to a new value and the version attribute incremented by 1. Another thing to note about this call is the "conditionExpression", which is essentially a way to tell DynamoDB to update the attributes if a condition matches up, in this specific instance if the existing value of version matches up. This provides a neat way to support optimistic locking on the record.

Conclusion

A lot of details here - the easiest way to get a feel for it is by trying out the code which is available in my github repository here - https://github.com/bijukunjummen/boot-with-dynamodb. The readme has good details on how to run it in a local environment. 

AWS DynamoDB provides a neat way to manage a version field on entities, ensuring that they are atomically updated and provides a way for them to used for optimistic locking