Friday, August 28, 2015

Couchbase Java SDK with Rx-Java

A neat thing about Couchbase Java SDK is that it is built on top of the excellent Rx-Java library, this enables a reactive way to interact with a Couchbase server instance which is very intuitive once you get the hang of it.

Consider a very simple json document that I intend to store in Couchbase:

{"key":"1","value":"one"}


and a Java class to hold this json:

public class KeyVal {
    private String key;
    private String value;

    ...
}

The following is the code to insert an instance of KeyVal to a Couchbase bucket:

JsonObject jsonObject = JsonObject.empty().put("key", keyVal.getKey()).put("value", keyVal.getValue());
JsonDocument doc = JsonDocument.create(keyVal.getKey(), jsonObject);
Observable<JsonDocument> obs = bucket
                .async()
                .insert(doc);

The return type of the insert is an Observable, so if I needed to map the return type back a KeyVal I can use the extensive mapping support provided by Observable class.

Observable<KeyVal> obs = bucket
                .async()
                .insert(doc)
                .map(jsonDoc -> 
                    new KeyVal(jsonDoc.id(), jsonDoc.content().getString("value"))
                );



Other API's follow a similar pattern, for eg. to retrieve the saved document:

bucket
                .async()
                .get(id)
                .map(doc ->
                        new KeyVal(doc.id(),
                                doc.content().getString("value")));

If you are interested in exploring this sample further, here is my github repo with a working example - https://github.com/bijukunjummen/sample-karyon2-couch


References:

Tuesday, August 18, 2015

Spring boot static web resource handling for Single Page Applications

Javascript build tools like gulp and grunt truly boggle my mind, I look at one of the build scripts for these tools and find it difficult to get my head around it and cannot imagine writing one of these build scripts from scratch. This is where yeoman comes in, a very handy tool that quickly allows one to bootstrap a good starter project using any of the myriad combination of javascript build tools.

I wanted to explore an approach which Spring framework recommends for handling static web resources, which is to use these very capable build tools for building the static assets and using Spring for serving out the content once the static assets are built into a distributable state.

My approach was to use yeoman to generate a starter project, I chose the gulp-angular as my base and quickly generated a complete project, available here. I was able to expand this template into a fairly comprehensive angularjs based single page application which delegates back to Spring based REST calls to service the UI.

The steps that I followed are the following, mostly copied from the excellent sample created by Brian Clozel:

If you want to follow along the end result is available in my github repo.


  1. Define two modules, the "client" module to hold the generated yeoman template and a "server" module to hold the Spring Boot application.
  2. Hack away on the "client" module, in this specific instance I have created a simple angularjs based application
  3. I am using maven as the java build tool so I have a wrapper maven pom file which triggers the javascript build chain as part of the maven build cycle, then takes the built artifacts and creates a client jar out of it. The static content is cleverly placed at a location that Spring boot can get to, in this instance at the classpath:/static location.
  4. In the "server" module client is added as a dependency and "server" is set to be run as a full-fledged spring-boot project
  5. Serve out the project from the server module by executing:
    mvn spring-boot:run
    

Conclusion


Spring Boot has taken an excellent approach to providing an asset pipeline for static web resources which is to not interfere with the very capable build tools in the Javascript universe and providing a clean way to serve out the generated static content.

Saturday, August 8, 2015

Working with Spring-Cloud and Netflix Archaius

Background

Spring Cloud provides all the tools that you require to create cloud ready microservices. One of the infrastructure components that Spring-Cloud provides is a Configuration server to centralize the properties of an application, however it is possible that you that you may be using other solutions to manage the properties. One such solution is Netflix Archaius and if you work with Netflix Archaius there is a neat way that Spring-Cloud provides to integrate with it.

Integration With Archaius

Spring Cloud provides a Spring Boot Auto-configuration for Archaius which gets triggered on finding the Archaius related libraries with the application. So first to pull in the Archaius libraries, which can be done through the following dependency entry in the POM file:
<dependency>
    <groupId>com.netflix.archaius</groupId>
    <artifactId>archaius-core</artifactId>
</dependency>
Not that the version of the dependency need not be specified, this information flows in from the dependency management information in the parent POM’s.
With this new library in place, Archaius Configuration, all that now needs to be done is to define Spring beans which extend Apache Commons Configuration AbstractConfiguration class and these would automatically get configured by Spring Cloud. As an example consider the following AbstractConfiguration which has one property in it:
@Bean
public AbstractConfiguration sampleArchaiusConfiguration() throws Exception {
    ConcurrentMapConfiguration concurrentMapConfiguration = new ConcurrentMapConfiguration();
    concurrentMapConfiguration.addProperty("testkey", "testvalue");
    return concurrentMapConfiguration;
}

That is essentially it, this property should now be visible as an Archaius property and can be accessed along these lines:

DynamicPropertyFactory.getInstance().getStringProperty("testkey", "").get()


Also there are a few more neat features provided through Archaius integration in Spring-Cloud:
  1. The Spring managed properties are visible as Archaius properties
  2. An endpoint(/archaius) is provided by Spring-Cloud where all the registered archaius properties can be viewed

Conclusion

Spring Cloud natively provides all the tools to write a Cloud Ready microservice, however it is possible that the way to configure the centralized properties may be via Netflix Archaius, if that is the case Spring Cloud enables this neat way to integrate with Archiaus.

Tuesday, July 28, 2015

Spring Boot @ConfigurationProperties

Spring Boot provides a very neat way to load properties for an application. Consider a set of properties described using yaml format:

prefix:
    stringProp1: propValue1
    stringProp2: propValue2
    intProp1: 10
    listProp:
        - listValue1
        - listValue2
    mapProp:
        key1: mapValue1
        key2: mapValue2

These entries can also be described in a traditional application.properties file the following way:

prefix.stringProp1=propValue1
prefix.stringProp2=propValue2
prefix.intProp1=10
prefix.listProp[0]=listValue1
prefix.listProp[1]=listValue2
prefix.mapProp.key1=mapValue1
prefix.mapProp.key2=mapValue2

It has taken me a little while, but I do like the hierarchical look of the properties described in a yaml format.

So now, given this property file a traditional Spring application would have loaded up the properties the following way:

public class SamplePropertyLoadingTest {
    @Value("${prefix.stringProp1}")
    private String stringProp1;

Note the placeholder for "prefix.stringProp" key.

This however is not ideal for loading a family of related properties, say in this specific case namespaced by the prefix conveniently named "prefix".

The approach Spring boot takes is to define a bean that can hold all the family of related properties this way:

@ConfigurationProperties(prefix = "prefix")
@Component
public class SampleProperty {
    private String stringProp1;
    private String stringProp2;
    @Max(99)
    @Min(0)
    private Integer intProp1;
    private List<String> listProp;
    private Map<String, String> mapProp;
    
    ...
}

At runtime, all the fields would be bound to the related properties cleanly.

Additionally note the JSR-303 annotations on top of the "intProp1" field that validates that value of the field is between 0 and 99, @ConfigurationProperties will call the validator to ensure that bound bean is validated.

An integration test making use of this is the following:

package prop;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SampleWebApplication.class)
public class SamplePropertyLoadingTest {
    @Autowired
    private SampleProperty sampleProperty;

    @Value("${prefix.stringProp1}")
    private String stringProp1;

    @Test
    public void testLoadingOfProperties() {
        System.out.println("stringProp1 = " + stringProp1);
        assertThat(sampleProperty.getStringProp1(), equalTo("propValue1"));
        assertThat(sampleProperty.getStringProp2(), equalTo("propValue2"));
        assertThat(sampleProperty.getIntProp1(), equalTo(10));
        assertThat(sampleProperty.getListProp(), hasItems("listValue1", "listValue2"));
        assertThat(sampleProperty.getMapProp(), allOf(hasEntry("key1", "mapValue1"),
                hasEntry("key2", "mapValue2")));
    }
}

If you are interested in exploring this sample further, I have a github repo with the code checked in here.

Thursday, July 23, 2015

Scatter Gather - Using Java 8 CompletableFuture and Rx-Java Observable

I wanted to explore a simple scatter-gather scenario using Java 8 CompletableFuture and using Rx-Java Observable.


The scenario is simple - Spawn about 10 tasks, each returning a string, and ultimately collect the results into a list.

Sequential

A sequential version of this would be the following:

public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

 logger.info(list.toString());
}

private String generateTask(int i) {
 Util.delay(2000);
 return i + "-" + "test";
}

With CompletableFuture

A method can be made to return a CompletableFuture using a utility method called supplyAsync, I am using a variation of this method which accepts an explicit Executor to use, also I am deliberately throwing an exception for one of the inputs:

private CompletableFuture<String> generateTask(int i,
  ExecutorService executorService) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(2000);
  if (i == 5) {
   throw new RuntimeException("Run, it is a 5!");
  }
  return i + "-" + "test";
 }, executorService);
}

Now to scatter the tasks:

List<CompletableFuture<String>> futures =
  IntStream.range(0, 10)
    .boxed()
    .map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage()))
    .collect(Collectors.toList());

At the end of scattering the tasks the result is a list of CompletableFuture. Now, to obtain the list of String from this is a little tricky, here I am using one of the solutions suggested in Stackoverflow:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
  .thenApply(v -> futures.stream()
       .map(CompletableFuture::join)
       .collect(Collectors.toList()));

CompletableFuture.allOf method is being used here purely to compose the next action to take once all the scattered tasks are completed, once the tasks are completed the futures are again streamed and collected into a list of string.

The final result can then be presented asynchronously:
result.thenAccept(l -> {
 logger.info(l.toString());
});


With Rx-java Observable

Scatter gather with Rx-java is relatively cleaner than the CompletableFuture version as Rx-java provides better ways to compose the results together, again the method which performs the scattered task:

private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                if ( i == 5) {
                    throw new RuntimeException("Run, it is a 5!");
                }
                s.onNext( i + "-test");
                s.onCompleted();
            }).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

and to scatter the tasks:

List<Observable<String>> obs =
        IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(i, executors)).collect(Collectors.toList());

Once more I have a List of Observable's, and what I need is a List of results, Observable provides a merge method to do just that:

Observable<List<String>> merged = Observable.merge(obs).toList();

which can be subscribed to and the results printed when available:

merged.subscribe(
                l -> logger.info(l.toString()));

Friday, July 10, 2015

Rx-netty and Karyon2 based cloud ready microservice - Dependency Injection

I had previously written about using Rx-netty and Karyon2 for developing cloud ready microservices, there were a few issues with the sample though, partly reproduced here:

package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

The issues are:

  1. The routing logic is not centralized, the request handler has both the routing logic and the processing logic
  2. The dependencies are not injected in cleanly.


Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.

Routing

Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:

import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;

public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {

    @Inject
    private HealthCheck healthCheck;

    @Inject
    private ApplicationMessageHandler applicationMessageHandler;

    @Override
    public SimpleUriRouter get() {
        SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
        simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
        simpleUriRouter.addUri("/message", applicationMessageHandler);
        return simpleUriRouter;
    }
}

This router can now be registered via a custom guice module the following way:

public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {

    public KaryonAppModule() {
        super("routerModule", ByteBuf.class, ByteBuf.class);
    }

    @Override
    protected void configureServer() {
        bindRouter().toProvider(new AppRouteProvider());

        interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);

        server().port(8888);
    }
}

This is essentially it, now the routing logic is cleanly separated from the processing logic.

Dependency Injection


Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:

public class MessageHandlerServiceImpl implements MessageHandlerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        return Observable.<MessageAcknowledgement>create(s -> {
            s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
            s.onCompleted();
        });
    }


}

Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:

public class AppModule extends AbstractModule {


    @Override
    protected void configure() {
        bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
    }
}


With this in place, the MessageHandlerService can be injected in:

public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final MessageHandlerService messageHandlerService;

    @Inject
    public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
        this.messageHandlerService = messageHandlerService;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -> {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .flatMap(messageHandlerService::handleMessage)
                .flatMap(ack -> {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    }
}


With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong

Monday, June 29, 2015

Learning Spring-Cloud - Writing a microservice

Continuing my Spring-Cloud learning journey, earlier I had covered how to write the infrastructure components of a typical Spring-Cloud and Netflix OSS based micro-services environment - in this specific instance two critical components, Eureka to register and discover services and Spring Cloud Configuration to maintain a centralized repository of configuration for a service. Here I will be showing how I developed two dummy micro-services, one a simple "pong" service and a "ping" service which uses the "pong" service.


Sample-Pong microservice


The endpoint handling the "ping" requests is a typical Spring MVC based endpoint:

@RestController
public class PongController {

    @Value("${reply.message}")
    private String message;

    @RequestMapping(value = "/message", method = RequestMethod.POST)
    public Resource<MessageAcknowledgement> pongMessage(@RequestBody Message input) {
        return new Resource<>(
                new MessageAcknowledgement(input.getId(), input.getPayload(), message));
    }

}

It gets a message and responds with an acknowledgement. Here the service utilizes the Configuration server in sourcing the "reply.message" property. So how does the "pong" service find the configuration server, there are potentially two ways - directly by specifying the location of the configuration server, or by finding the Configuration server via Eureka. I am used to an approach where Eureka is considered a source of truth, so in this spirit I am using Eureka to find the Configuration server. Spring Cloud makes this entire flow very simple, all it requires is a "bootstrap.yml" property file with entries along these lines:

---
spring:
  application:
    name: sample-pong
  cloud:
    config:
      discovery:
        enabled: true
        serviceId: SAMPLE-CONFIG

eureka:
  instance:
    nonSecurePort: ${server.port:8082}
  client:
    serviceUrl:
      defaultZone: http://${eureka.host:localhost}:${eureka.port:8761}/eureka/

The location of Eureka is specified through the "eureka.client.serviceUrl" property and the "spring.cloud.config.discovery.enabled" is set to "true" to specify that the configuration server is discovered via the specified Eureka server.

Just a note, this means that the Eureka and the Configuration server have to be completely up before trying to bring up the actual services, they are the pre-requisites and the underlying assumption is that the Infrastructure components are available at the application boot time.

The Configuration server has the properties for the "sample-pong" service, this can be validated by using the Config-servers endpoint - http://localhost:8888/sample-pong/default, 8888 is the port where I had specified for the server endpoint, and should respond with a content along these lines:

"name": "sample-pong",
  "profiles": [
    "default"
  ],
  "label": "master",
  "propertySources": [
    {
      "name": "classpath:/config/sample-pong.yml",
      "source": {
        "reply.message": "Pong"
      }
    }
  ]
}

As can be seen the "reply.message" property from this central configuration server will be used by the pong service as the acknowledgement message

Now to set up this endpoint as a service, all that is required is a Spring-boot based entry point along these lines:

@SpringBootApplication
@EnableDiscoveryClient
public class PongApplication {
    public static void main(String[] args) {
        SpringApplication.run(PongApplication.class, args);
    }
}

and that completes the code for the "pong" service.


Sample-ping micro-service


So now onto a consumer of the "pong" micro-service, very imaginatively named the "ping" micro-service. Spring-Cloud and Netflix OSS offer a lot of options to invoke endpoints on Eureka registered services, to summarize the options that I had:

1. Use raw Eureka DiscoveryClient to find the instances hosting a service and make calls using Spring's RestTemplate.

2. Use Ribbon, a client side load balancing solution which can use Eureka to find service instances

3. Use Feign, which provides a declarative way to invoke a service call. It internally uses Ribbon.

I went with Feign. All that is required is an interface which shows the contract to invoke the service:

package org.bk.consumer.feign;

import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

@FeignClient("samplepong")
public interface PongClient {

    @RequestMapping(method = RequestMethod.POST, value = "/message",
            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    MessageAcknowledgement sendMessage(@RequestBody Message message);
}

The annotation @FeignClient("samplepong") internally points to a Ribbon "named" client called "samplepong". This means that there has to be an entry in the property files for this named client, in my case I have these entries in my application.yml file:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    NIWSServerListClassName: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
    ReadTimeout: 5000
    MaxAutoRetries: 2

The most important entry here is the "samplepong.ribbon.DeploymentContextBasedVipAddresses" which points to the "pong" services Eureka registration address using which the service instance will be discovered by Ribbon.

The rest of the application is a routine Spring Boot application. I have exposed this service call behind Hystrix which guards against service call failures and essentially wraps around this FeignClient:

package org.bk.consumer.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.bk.consumer.feign.PongClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service("hystrixPongClient")
public class HystrixWrappedPongClient implements PongClient {

    @Autowired
    @Qualifier("pongClient")
    private PongClient feignPongClient;

    @Override
    @HystrixCommand(fallbackMethod = "fallBackCall")
    public MessageAcknowledgement sendMessage(Message message) {
        return this.feignPongClient.sendMessage(message);
    }

    public MessageAcknowledgement fallBackCall(Message message) {
        MessageAcknowledgement fallback = new MessageAcknowledgement(message.getId(), message.getPayload(), "FAILED SERVICE CALL! - FALLING BACK");
        return fallback;
    }
}


"Boot"ing up


I have dockerized my entire set-up, so the simplest way to start up the set of applications is to first build the docker images for all of the artifacts this way:

mvn clean package docker:build -DskipTests

and bring all of them up using the following command, the assumption being that both docker and docker-compose are available locally:

docker-compose up

Assuming everything comes up cleanly, Eureka should show all the registered services, at http://dockerhost:8761 url -


The UI of the ping application should be available at http://dockerhost:8080 url -



Additionally a Hystrix dashboard should be available to monitor the requests to the "pong" app at this url http://dockerhost:8989/hystrix/monitor?stream=http%3A%2F%2Fsampleping%3A8080%2Fhystrix.stream:



References


1. The code is available at my github location - https://github.com/bijukunjummen/spring-cloud-ping-pong-sample

2. Most of the code is heavily borrowed from the spring-cloud-samples repository - https://github.com/spring-cloud-samples