Tuesday, April 30, 2019

Functional Hystrix using Spring Cloud HystrixCommands

Spring's WebClient provides a non-blocking client for making service to service calls. Hystrix, though now in a maintenance mode, has been used for protecting service to service calls by preventing cascading failures, providing circuit breakers for calls to slow or faulty upstream services.

In this post, I will be exploring how Spring Cloud provides a newer functional approach to wrapping a remote call with Hystrix.


Consider a simple service that returns a list of entities, say a list of cities, modeled using the excellent Wiremock tool:

WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching("/cities"))
                .withHeader("Accept", WireMock.equalTo("application/json"))
                .willReturn(WireMock.aResponse()
                        .withStatus(HttpStatus.OK.value())
                        .withFixedDelay(5000)
                        .withHeader("Content-Type", "application/json")))

When called with a uri of the type "/cities" this Wiremock endpoint responds with a json of the following type:

[
  {
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
  },
  {
    "country": "USA",
    "id": 2,
    "name": "Seattle",
    "pop": 3200000
  },
  {
    "country": "USA",
    "id": 3,
    "name": "SFO",
    "pop": 6400000
  }
]

after a delay of 5 seconds.

Traditional approach


There are many approaches to using Hystrix, I have traditionally preferred an approach where an explicit Hystrix Command protects the remote call, along these lines:

import com.netflix.hystrix.HystrixCommandGroupKey
import com.netflix.hystrix.HystrixCommandKey
import com.netflix.hystrix.HystrixCommandProperties
import com.netflix.hystrix.HystrixObservableCommand
import org.bk.samples.model.City
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import rx.Observable
import rx.RxReactiveStreams
import rx.schedulers.Schedulers
import java.net.URI


class CitiesHystrixCommand(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) : HystrixObservableCommand<City>(
        HystrixObservableCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("cities-service"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("cities-service"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionTimeoutInMilliseconds(4000))) {
    override fun construct(): Observable<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        val result: Flux<City> = webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }

        return RxReactiveStreams.toObservable(result)
    }

    override fun resumeWithFallback(): Observable<City> {
        LOGGER.error("Falling back on cities call", executionException)
        return Observable.empty()
    }

    companion object {
        private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java)
    }
}

This code can now be used to make a remote call the following way:

import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient


class CitiesHystrixCommandBasedClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {
    fun getCities(): Flux<City> {
        val citiesObservable: Observable<City> = CitiesHystrixCommand(webClientBuilder, citiesBaseUrl)
                .observe()
                .subscribeOn(Schedulers.io())

        return Flux
                .from(RxReactiveStreams
                        .toPublisher(citiesObservable))
    }
}


Two things to note here,
1. WebClient returns a Project Reactor "Flux" type representing a list of cities, however Hystrix is Rx-Java 1 based, so Flux is being transformed to Rx-Java Observable using "RxReactiveStreams.toObservable()" call, provided by the RxJavaReactiveStreams library here.

2. I still want Project Reactor "Flux" type to be used in the rest of the application, so there is another adapter that converts the Rx-Java Observable back to a Flux - "Flux.from(RxReactiveStreams.toPublisher(citiesObservable))" once the call wrapped in Hystrix returns.


If I were to try this client with the wiremock sample with the 5 second delay, it correctly handles the delay and returns after a second.


Functional approach


There is a lot of boiler-plate with the previous approach which is avoided with the new functional approach of using HystrixCommands, a utility class which comes with Spring Cloud which provides a functional approach to making the remote call wrapped with Hystrix.

The entirety of the call using HystrixCommands looks like this:

import com.netflix.hystrix.HystrixCommandProperties
import org.bk.samples.model.City
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.cloud.netflix.hystrix.HystrixCommands
import org.springframework.http.MediaType
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToFlux
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Flux
import rx.schedulers.Schedulers
import java.net.URI

class CitiesFunctionalHystrixClient(
        private val webClientBuilder: WebClient.Builder,
        private val citiesBaseUrl: String
) {
    fun getCities(): Flux<City> {
        return HystrixCommands
                .from(callCitiesService())
                .commandName("cities-service")
                .groupName("cities-service")
                .commandProperties(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(1000)
                )
                .toObservable { obs ->
                    obs.observe()
                            .subscribeOn(Schedulers.io())
                }
                .fallback { t: Throwable ->
                    LOGGER.error(t.message, t)
                    Flux.empty()
                }
                .toFlux()
    }

    fun callCitiesService(): Flux<City> {
        val buildUri: URI = UriComponentsBuilder
                .fromUriString(citiesBaseUrl)
                .path("/cities")
                .build()
                .encode()
                .toUri()

        val webClient: WebClient = this.webClientBuilder.build()

        return webClient.get()
                .uri(buildUri)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMapMany { clientResponse ->
                    clientResponse.bodyToFlux<City>()
                }
    }

    companion object {
        private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand::class.java)
    }
}


A lot of boiler-plate is avoided with this approach -
1. an explicit command is not required anymore
2. the call and the fallback are coded in a fluent manner
3. Any overrides can be explicitly specified - in this specific instance the timeout of 1 second.

Conclusion

I like the conciseness which HystrixCommands brings to the usage of Hystrix with WebClient. I have the entire sample available in my github repo - https://github.com/bijukunjummen/webclient-hystrix-sample, all the dependencies required to get the samples to work is part of this repo. If you are interested in sticking with Rx-Java 1, then an approach described here may help you avoid boiler-plate with vanilla Hystrix


Tuesday, March 5, 2019

Water Pouring Problem with Kotlin and Vavr

The first time I saw the Water Pouring Problem being programmatically solved was the excellent lectures on functional Programming by Martin Odersky on Coursera. The solution demonstrates the power of lazy evaluation in Streams with Scala.

Solving Water Pouring Problem using Kotlin

I wanted to explore how I can rewrite the solution described by Martin Odersky using Kotlin and I realized two things - one is that the immutable data structures that Kotlin offers are simply wrappers over Java Collections library and are not truly immutable, secondly the solution using Streams feature in Java will be difficult. However, the Vavr offers a good alternative to both - a first-class Immutable collections library and a Streams library and I took a crack at replicating the solution with Kotlin and Vavr.

A Cup looks like this, represented as a Kotlin data class:

import io.vavr.collection.List


data class Cup(val level: Int, val capacity: Int) {
    override fun toString(): String {
        return "Cup($level/$capacity)"
    }
}

Since the water pouring problem repesents the "state" of a set of cups, this can be simply represented as a "typealias" the following way:
typealias State = List<Cup>


There are 3 different types of moves that can be performed with the water in the cup - Empty it, Fill it, or Pour from one cup to another, represented again as Kotlin Data classes:

interface Move {
    fun change(state: State): State
}

data class Empty(val glass: Int) : Move {
    override fun change(state: State): State {
        val cup = state[glass]
        return state.update(glass, cup.copy(level = 0))
    }

    override fun toString(): String {
        return "Empty($glass)"
    }
}

data class Fill(val glass: Int) : Move {
    override fun change(state: State): State {
        val cup = state[glass]
        return state.update(glass, cup.copy(level = cup.capacity))
    }

    override fun toString(): String {
        return "Fill($glass)"
    }
}

data class Pour(val from: Int, val to: Int) : Move {
    override fun change(state: State): State {
        val cupFrom = state[from]
        val cupTo = state[to]
        val amount = min(cupFrom.level, cupTo.capacity - cupTo.level)

        return state
                .update(from, cupFrom.copy(cupFrom.level - amount))
                .update(to, cupTo.copy(level = cupTo.level + amount))
    }

    override fun toString(): String {
        return "Pour($from,$to)"
    }
}

The implementation is making use of Vavr's List data structures "update" method to create a new list with just the relevant elements updated.


A "Path" represents a history of moves leading to the current state:

data class Path(val initialState: pour.State, val endState: State, val history: List<Move>) {
    fun extend(move: Move) = Path(initialState, move.change(endState), history.prepend(move))

    override fun toString(): String {
        return history.reverse().mkString(" ") + " ---> " + endState
    }
}

I am using the "prepend" method of the list to add elements to the beginning of history. Prepending to a list is an O(1) operation whereas appending is O(n), hence the choice.

Given a "state", a set of possible moves to change the "state" are the following -

1. Empty the glasses -

(0 until count).map { Empty(it) }

2. Fill the glasses -

(0 until count).map { Fill(it) }

3. Pour from one glass to another -

(0 until count).flatMap { from ->
    (0 until initialState.length()).filter { to -> from != to }.map { to ->
        Pour(from, to)
    }
}

Now, all these moves are used for advancing from one state to another. Consider say 2 cups with capacity of 4 and 9 litres, initially filled with 0 litres of water, represented as "List(Cup(0/4), Cup(0/9))", with all possible moves the next set of states of the cups are the following:


Similarly, advancing each of these states to a new set of states would like this(in a somewhat simplified form):


As each State advances to a next set of states based on all possible moves, it can be seen that there will be an explosion of possible paths, this is where laziness offered by the Stream data structure of Vavr comes in. The values in a stream are only computed on request.


Given a set of paths, new paths are created using Stream the following way:

fun from(paths: Set<Path>, explored: Set<State>): Stream<Set<Path>> {
    if (paths.isEmpty) {
        return Stream.empty()
    } else {
        val more = paths.flatMap { path ->
            moves.map { move ->
                val next: Path = path.extend(move)
                next
            }.filter { !explored.contains(it.endState) }
        }
        return Stream.cons(paths) { from(more, explored.addAll(more.map { it.endState })) }
    }
}


So, now given a stream of potential paths from the initial state to a new state, a solution to a "target" state becomes:

val pathSets = from(hashSet(initialPath), hashSet())

fun solution(target: State): Stream<Path> {
    return pathSets.flatMap { it }.filter { path -> path.endState == target }
}



That covers the solution, a test with this code looks like this - there are two cups of 4 litre and 9 litre capacity, initially filled with 0 litres of water. The final target state is to get the second cup filled with 6 litres of water:

val initialState = list(Cup(0, 4), Cup(0, 9))
val pouring = Pouring(initialState)

pouring.solution(list(Cup(0, 4), Cup(6, 9)))
    .take(1).forEach { path ->
        println(path)
    }

when run, this spits out the following solution:

Fill(1) Pour(1,0) Empty(0) Pour(1,0) Empty(0) Pour(1,0) Fill(1) Pour(1,0) Empty(0) ---> List(Cup(0/4), Cup(6/9))

Graphically represented, the solution looks like this:



It may be easier to simply follow a working version of the sample which is available in my github repo - https://github.com/bijukunjummen/algos/blob/master/src/test/kotlin/pour/Pouring.kt


Conclusion

Although Kotlin lacks first class support for native immutable datastructures, I feel that a combination of Vavr with Kotlin makes for a solution that is as elegant as the Scala one.

Monday, January 21, 2019

Spring-Boot 2.1.x and overriding bean definition

I was recently migrating an application from Spring Boot 1.5.X to Spring Boot 2.X and saw an issue with overriding Spring Bean definitions. One of the configurations was along these lines in Kotlin:


@Configuration
class DynamoConfig {

    @Bean
    fun dynamoDbAsyncClient(dynamoProperties: DynamoProperties): DynamoDbAsyncClient {
        ...
    }

    @Bean
    fun dynampoDbSyncClient(dynamoProperties: DynamoProperties): DynamoDbClient {
        ...
    }
}


Now, for a test I wanted to override these 2 bean definitions and did something along these lines:

@SpringBootTest
class DynamoConfigTest {

    @Test
    fun saveHotel() {
        val hotelRepo = DynamoHotelRepo(localDynamoExtension.asyncClient!!)
        val hotel = Hotel(id = "1", name = "test hotel", address = "test address", state = "OR", zip = "zip")
        val resp = hotelRepo.saveHotel(hotel)

        StepVerifier.create(resp)
                .expectNext(hotel)
                .expectComplete()
                .verify()
    }


    @TestConfiguration
    class SpringConfig {
        @Bean
        fun dynamoDbAsyncClient(dynamoProperties: DynamoProperties): DynamoDbAsyncClient {
            ...
        }

        @Bean
        fun dynamoDbSyncClient(dynamoProperties: DynamoProperties): DynamoDbClient {
            ...
        }
    }
}

This type of overriding works with Spring Boot 1.5.X but fails with Spring Boot 2.1.X with an error:

Invalid bean definition with name 'dynamoDbAsyncClient' defined in sample.dyn.repo.DynamoConfigTest$SpringConfig:.. 
There is already .. defined in class path resource [sample/dyn/config/DynamoConfig.class]] bound



I feel this behavior is right, not allowing beans to overridden this way is the correct default behavior for an application, however I do want the ability to override the beans for tests and thanks to a Stack Overflow answer and Spring Boot 2.1.X release notes, the fix is to allow overrides using a property "spring.main.allow-bean-definition-overriding=true", so with this change, the test looks like this:

@SpringBootTest(properties = ["spring.main.allow-bean-definition-overriding=true"])
class DynamoConfigTest {

    @Test
    fun saveHotel() {
        val hotelRepo = DynamoHotelRepo(localDynamoExtension.asyncClient!!)
        val hotel = Hotel(id = "1", name = "test hotel", address = "test address", state = "OR", zip = "zip")
        val resp = hotelRepo.saveHotel(hotel)

        StepVerifier.create(resp)
                .expectNext(hotel)
                .expectComplete()
                .verify()
    }


    @TestConfiguration
    class SpringConfig {
        @Bean
        fun dynamoDbAsyncClient(dynamoProperties: DynamoProperties): DynamoDbAsyncClient {
            ...
        }

        @Bean
        fun dynamoDbSyncClient(dynamoProperties: DynamoProperties): DynamoDbClient {
            ...
        }
    }
}

Friday, January 4, 2019

Unit testing DynamoDB applications using JUnit5

In a previous post I had described the new AWS SDK for Java 2 which provides non-blocking IO support for Java clients calling different AWS services. In this post I will go over an approach that I have followed to unit test the AWS DynamoDB calls.

There are a few ways to spin up a local version of DynamoDB -

1. AWS provides a DynamoDB local
2. Localstack provides a way to spin up a good number of AWS services locally
3. A docker version of DynamoDB Local
4. Dynalite, a node based implementation of DynamoDB


Now to be able to unit test an application, I need to be able to start up an embedded version of DynamoDB using one of these options right before a test runs and then shut it down after a test completes. There are three approaches that I have taken:

1. Using a JUnit 5 extension that internally brings up a AWS DynamoDB Local and spins it down after a test.
2. Using testcontainers to start up a docker version DynamoDB Local
3. Using testcontainers to start up DynaLite

JUnit5 extension

JUnit5 extension provides a convenient hook point to start up an embedded version of DynamoDB for tests. It works by pulling in a version of DynamoDB Local as a maven dependency:

dependencies {
    ...
 testImplementation("com.amazonaws:DynamoDBLocal:1.11.119")
    ...
}

A complication with this dependency is that there are native components (dll, .so etc) that the DynamoDB Local interacts with and to get these in the right place, I depend on a Gradle task:

task copyNativeDeps(type: Copy) {
 mkdir "build/native-libs"
 from(configurations.testCompileClasspath) {
  include '*.dll'
  include '*.dylib'
  include '*.so'
 }
 into 'build/native-libs'
}

test {
 dependsOn copyNativeDeps
}

which puts the native libs in build/native-libs folder, and the extension internally sets this path as a system property:

System.setProperty("sqlite4java.library.path", libPath.toAbsolutePath().toString())

Here is the codebase to the JUnit5 extension with all these already hooked up - https://github.com/bijukunjummen/boot-with-dynamodb/blob/master/src/test/kotlin/sample/dyn/rules/LocalDynamoExtension.kt

A test using this extension looks like this:

class HotelRepoTest {
    companion object {
        @RegisterExtension
        @JvmField
        val localDynamoExtension = LocalDynamoExtension()

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            val dbMigrator = DbMigrator(localDynamoExtension.syncClient!!)
            dbMigrator.migrate()
        }

    }
    @Test
    fun saveHotel() {
        val hotelRepo = DynamoHotelRepo(localDynamoExtension.asyncClient!!)
        val hotel = Hotel(id = "1", name = "test hotel", address = "test address", state = "OR", zip = "zip")
        val resp = hotelRepo.saveHotel(hotel)

        StepVerifier.create(resp)
                .expectNext(hotel)
                .expectComplete()
                .verify()
    }
}

The code can interact with a fully featured DynamoDB.

TestContainers with DynamoDB Local Docker


The JUnit5 extensions approach works well but it requires an additional dependency with native binaries to be pulled in. A cleaner approach may be to use the excellent Testcontainers to spin up a docker version of DynamoDB Local the following way:

class HotelRepoLocalDynamoTestContainerTest {
    @Test
    fun saveHotel() {
        val hotelRepo = DynamoHotelRepo(getAsyncClient(dynamoDB))
        val hotel = Hotel(id = "1", name = "test hotel", address = "test address", state = "OR", zip = "zip")
        val resp = hotelRepo.saveHotel(hotel)

        StepVerifier.create(resp)
                .expectNext(hotel)
                .expectComplete()
                .verify()
    }



    companion object {
        val dynamoDB: KGenericContainer = KGenericContainer("amazon/dynamodb-local:1.11.119")
                .withExposedPorts(8000)

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            dynamoDB.start()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            dynamoDB.stop()
        }

        fun getAsyncClient(dynamoDB: KGenericContainer): DynamoDbAsyncClient {
            val endpointUri = "http://" + dynamoDB.getContainerIpAddress() + ":" +
                    dynamoDB.getMappedPort(8000)
            val builder: DynamoDbAsyncClientBuilder = DynamoDbAsyncClient.builder()
                    .endpointOverride(URI.create(endpointUri))
                    .region(Region.US_EAST_1)
                    .credentialsProvider(StaticCredentialsProvider
                            .create(AwsBasicCredentials
                                    .create("acc", "sec")))
            return builder.build()
        }

        ...
    }
}

This code starts up DynamoDB at a random unoccupied port and provides this information so that the client can be created using this information. There is a little Kotlin workaround that I had to do based on an issue reported here - https://github.com/testcontainers/testcontainers-java/issues/318


TestContainers with Dynalite


Dynalite is a javascript based implementation of DynamoDB and can be run for tests again using the TestContainer approach. This time however there is already a TestContainer module for Dynalite. I found that it does not support JUnit5 and sent a Pull request to provide this support, in the iterim the raw docker image can be used and this is how a test looks like:

class HotelRepoDynaliteTestContainerTest {
    @Test
    fun saveHotel() {
        val hotelRepo = DynamoHotelRepo(getAsyncClient(dynamoDB))
        val hotel = Hotel(id = "1", name = "test hotel", address = "test address", state = "OR", zip = "zip")
        val resp = hotelRepo.saveHotel(hotel)

        StepVerifier.create(resp)
                .expectNext(hotel)
                .expectComplete()
                .verify()
    }

    companion object {
        val dynamoDB: KGenericContainer = KGenericContainer("quay.io/testcontainers/dynalite:v1.2.1-1")
                .withExposedPorts(4567)

        @BeforeAll
        @JvmStatic
        fun beforeAll() {
            dynamoDB.start()
            val dbMigrator = DbMigrator(getSyncClient(dynamoDB))
            dbMigrator.migrate()
        }

        @AfterAll
        @JvmStatic
        fun afterAll() {
            dynamoDB.stop()
        }

        fun getAsyncClient(dynamoDB: KGenericContainer): DynamoDbAsyncClient {
            val endpointUri = "http://" + dynamoDB.getContainerIpAddress() + ":" +
                    dynamoDB.getMappedPort(4567)
            val builder: DynamoDbAsyncClientBuilder = DynamoDbAsyncClient.builder()
                    .endpointOverride(URI.create(endpointUri))
                    .region(Region.US_EAST_1)
                    .credentialsProvider(StaticCredentialsProvider
                            .create(AwsBasicCredentials
                                    .create("acc", "sec")))
            return builder.build()
        }
        ...
    }
}

Conclusion

All of the approaches are useful in being able to test integration with DynamoDB. My personal preference is using the TestContainers approach if a docker agent is available else with the JUnit5 extension approach. The samples with fully working tests using all the three approaches are available in my github repo - https://github.com/bijukunjummen/boot-with-dynamodb

Sunday, December 16, 2018

Reactive Spring Webflux with AWS DynamoDB

AWS has released AWS SDK for Java version 2, the SDK now supports non-blocking IO for the API calls of different AWS services. In this post I will be exploring using the DynamoDB API's of the AWS SDK 2.x and using Spring Webflux stack to expose a reactive endpoint - this way the application is reactive end to end and presumably should use resources very efficiently (I have plans to do some tests on this set-up as a follow up).


Details of the Application


It may be easier to simply look at the code and follow it there - it is available in my GitHub repo.

The application is a simple one - to perform CRUD operation on a Hotel entity represented using the following Kotlin code:

data class Hotel(
        val id: String = UUID.randomUUID().toString(),
        val name: String? = null,
        val address: String? = null,
        val state: String? = null,
        val zip: String? = null
)

I want to expose endpoints to save and retrieve a hotel entity and to get the list of hotels by state.


Details of the AWS SDK 2


The package names of the AWS SDK 2 api's all start with "software.amazon.awssdk" prefix now, the client to interact with DynamoDB is created using code along these lines:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

val client: DynamoDbAsyncClient = DynamoDbAsyncClient.builder()
        .region(Region.of(dynamoProperties.region))
        .credentialsProvider(DefaultCredentialsProvider.builder().build())
        .build()


Once the DynamoDbAsyncClient instance is created, any operation using this client returns a Java 8 CompletableFuture type. For eg. in saving a Hotel entity:

val putItemRequest = PutItemRequest.builder()
        .tableName("hotels")
        .item(HotelMapper.toMap(hotel))
        .build()
        
val result: CompletableFuture<PutItemResponse> =
        dynamoClient.putItem(putItemRequest)

and in retrieving a record by id:

val getItemRequest: GetItemRequest = GetItemRequest.builder()
        .key(mapOf(Constants.ID to AttributeValue.builder().s(id).build()))
        .tableName(Constants.TABLE_NAME)
        .build()

val response: CompletableFuture<GetItemResponse> = dynamoClient.getItem(getItemRequest)

CompletableFuture provides a comprehensive set of functions to transform the results when available.

Integrating with Spring Webflux

Spring Webflux is a reactive web framework. The non-blocking IO support in AWS SDK 2 now makes it possible to write an end to end reactive and non-blocking applications with DynamoDB. Spring Webflux uses reactor-core to provide reactive streams support and the trick to integrating with AWS SDK 2 is to transform the Java 8 CompletableFuture to a reactor-core type, the following way when retrieving an item from DynamoDB by id:

val getItemRequest: GetItemRequest = GetItemRequest.builder()
        .key(mapOf(Constants.ID to AttributeValue.builder().s(id).build()))
        .tableName(Constants.TABLE_NAME)
        .build()

return Mono.fromCompletionStage(dynamoClient.getItem(getItemRequest))
        .map { resp ->
            HotelMapper.fromMap(id, resp.item())
        }

Spring Webflux expects the return types of the different web endpoint method signatures to be of reactive types, so a typical endpoint for getting say a list of hotels is the following:

@RequestMapping(value = ["/hotels"], method = [RequestMethod.GET])
fun getHotelsByState(@RequestParam("state") state: String): Flux<Hotel> {
    return hotelRepo.findHotelsByState(state)
}

Spring Webflux also supports a functional way to describe the API of the application, so an equivalent API to retrieve a hotel by its id, but expressed as a functional DSL is the following:

@Configuration
class HotelAdditionalRoutes {

    @Bean
    fun routes(hotelRepo: HotelRepo) = router {
        GET("/hotels/{id}") { req ->
            val id = req.pathVariable("id")
            val response: Mono<ServerResponse> = hotelRepo.getHotel(id)
                    .flatMap { hotel ->
                        ServerResponse.ok().body(BodyInserters.fromObject(hotel))
                    }
            response.switchIfEmpty(ServerResponse.notFound().build())
        }
    }
}


Conclusion

AWS SDK 2 makes it simple to write an end to end reactive and non-blocking applications. I have used Spring Webflux and AWS SDK 2 dynamo client to write such an application here. The entire working sample is available in my GitHub repo - https://github.com/bijukunjummen/boot-with-dynamodb, and has instructions on how to start up a local version of DynamoDB and use it for testing the application.


Monday, October 29, 2018

Helm chart to deploy and scale a generic app image

This is a post about a simple helm chart that I have worked on to deploy any generic app image to Kubernetes. The chart is available here(https://github.com/bijukunjummen/generic-app-chart).

It tries to solve the issue of having to manage a set of raw Kubernetes resources(deployment, secrets, hpa) and instead letting helm manage these resources. The chart is generic enough that it should be able to handle most 12-factor compliant app images.


Consider a simple app that I have here - https://github.com/bijukunjummen/sample-boot-knative, an image for this application is publicly available on dockerhub - https://hub.docker.com/r/bijukunjummen/sample-boot-knative-app/

If I wanted to deploy this app in a Kubernetes cluster, a set of specs to create a Kubernetes Deployment and a service is available here - https://github.com/bijukunjummen/sample-boot-knative/tree/master/kube. This is a simple enough deployment, however, the specs can get complicated once configuration, secrets are layered in and if features like Horizontal scaling is required.

Usage


There is a good documentation in the README of the chart, I will be mostly repeating that information here. I have hosted a version of the chart as a chart repository using github-pages, the first step to using the chart is to add this repo to your list of helm repos:

helm repo add bk-charts http://bijukunjummen.github.io/generic-app-chart
helm repo update

The chart should now show up if searched for:

helm search generic-app-chart


The chart requires the details of the application that is being deployed, which can be provided as a yaml the following way:

app:
  healthCheckPath: /actuator/info
  environment:
    SERVER_PORT: "8080"
    ENV_NAME: ENV_VALUE
  secrets:
    SECRET1: SECRET_VALUE1
  autoscaling:
    enabled: true
    maxReplicas: 2
    minReplicas: 1
    targetCPUUtilizationPercentage: 40    

image:
  repository: bijukunjummen/sample-boot-knative-app
  tag: 0.0.3-SNAPSHOT    

ingress:
  enabled: true
  path: /
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/affinity: "cookie"
    nginx.ingress.kubernetes.io/session-cookie-name: "route"

resources:
  constraints: 
    enabled: true
  requests:
    cpu: 500m


At a minimum, the only details that are required are the application image and the tag, the rest of the details is just for illustration of what is feasible.

To deploy the app, run the following command:

helm install bk-charts/generic-app-chart  --name my-app --values sample-values.yaml

and a bunch of Kubernetes resources should show up supporting this application.


App upgrades are simple, facilitated by helm:

helm upgrade my-app bk-charts/generic-app-chart -f sample-values.yaml


Conclusion

The chart is fairly minimal at this point and creates a small set of Kubernetes resources - a secrets to hold secrets!, a deployment, a service, an hpa to scale the app, which suffices the kind of use cases that I have encountered so far.

Saturday, September 22, 2018

Knative serving - using Ambassador gateway

This is a continuation of my experimentation with Knative serving, this time around building a gateway on top of a Knative serving applications. This builds on two of my previous posts - on using Knative to deploy a Spring Boot App and making a service to service call in Knative.

Why a Gateway on top of Knative application


To explain this let me touch on my previous blog post. Assuming that Knative serving is already available in a Kubernetes environment, the way to deploy an application is using a manifest which looks like this:

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: sample-boot-knative-service
  namespace: default
spec:
  runLatest:
    configuration:
      revisionTemplate:
        spec:
          container:
            image: bijukunjummen/sample-boot-knative-app:0.0.3-SNAPSHOT
            env:
            - name: ASAMPLE_ENV
              value: "sample-env-val"


Now to invoke this application, I have to make the call via an ingress created by Knative serving, which can be obtained the following way in a minikube environment:

export GATEWAY_URL=$(echo $(minikube ip):$(kubectl get svc knative-ingressgateway -n istio-system -o 'jsonpath={.spec.ports[?(@.port==80)].nodePort}'))

The request now has to go through the ingress and the ingress uses a Host http header to then route the request to the app. The host header for the deployed service can be obtained using the following bash script:

export APP_DOMAIN=$(kubectl get services.serving.knative.dev sample-boot-knative-service  -o="jsonpath={.status.domain}")

and then a call via the knative ingress gateway made the following way, using CURL:

curl -X "POST" "http://${GATEWAY_URL}/messages" \
     -H "Accept: application/json" \
     -H "Content-Type: application/json" \
     -H "Host: ${APP_DOMAIN}" \
     -d $'{
  "id": "1",
  "payload": "one",
  "delay": "300"
}'

or using httpie:

http http://${GATEWAY_URL}/messages Host:"${APP_DOMAIN}" id=1 payload=test delay=1

There are too many steps involved in making a call to the application via the knative ingress:



My objective in this post is to simplify the users experience in making a call to the app by using a Gateway like Ambassador.


Integrating Ambassador to Knative


There is nothing special about installing Ambassador into a Knative environment, the excellent instructions provided here worked cleanly in my minikube environment.

Now my objective with the gateway is summarized in this picture:


With Ambassador in place, all the user has to do is to send a request to Ambassador Gateway and it would take care of plugging in the Host header before making a request to the Knative Ingress.

So how does this work, fairly easily! assuming Ambassador is in place, all it needs is a configuration which piggybacks on a Kubernetes service the following way:

---
apiVersion: v1
kind: Service
metadata:
  name: sample-knative-app-gateway
  annotations:
    getambassador.io/config: |
      ---
      apiVersion: ambassador/v0
      kind:  Mapping
      name: sample-boot-knative-app
      prefix: /messages
      rewrite: /messages
      service: knative-ingressgateway.istio-system.svc.cluster.local 
      host_rewrite: sample-boot-knative-service.default.example.com
spec:
  type: LoadBalancer
  ports:
  - name: ambassador
    port: 80
    targetPort: 80
  selector:
    service: ambassador

Here I am providing configuration via a Service annotations, intercepting any calls to /messages uri and forwarding these request to the knative ingressgatway service (knative-ingressgateway.istio-system.svc.cluster.local) and adding the host header of "sample-boot-knative-service.default.example.com".


Now the interaction from a user perspective is far simpler, all I have to do is to get the url for this new service and to make the api call, in a minikube environment using the following bash script:

export AMB_URL=$(echo $(minikube ip):$(kubectl get svc sample-knative-app-gateway -n default -o 'jsonpath={.spec.ports[?(@.port==80)].nodePort}'))

http http://${AMB_URL}/messages id=1 payload=test delay=1


It may be easier to try this on a real code, which is available my github repo here.