Saturday, May 30, 2015

Rx-netty and Karyon2 based cloud ready microservice

Netflix Karyon provides a clean framework for creating cloud-ready micro-services. In your organization if you use the Netflix OSS stack consisting of Eureka for service registration and discovery, Archaius for property management, then very likely you use Karyon to create your microservices.

Karyon has been undergoing quite a lot of changes recently and my objective here is to document a good sample using the newer version of Karyon. The old Karyon(call it Karyon1) was based on JAX-RS 1.0 Specs with Jersey as the implementation, the newer version of Karyon(Karyon2) still supports Jersey but also encourages the use of RX-Netty which is a customized version of Netty with support for Rx-java.

With that said, let me jump into a sample. My objective with this sample is to create a "pong" micro-service which takes a "POST"ed "message" and returns an "Acknowledgement"

The following is a sample request:

"id": "id",

And an expected response:


The first step is to create a RequestHandler which as the name suggests is an RX-Netty component dealing with routing the incoming request:


import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;

import java.nio.charset.Charset;

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;

    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    return response.close();
        } else {
            return response.close();

This flow is completely asynchronous and internally managed by the RX-java libraries, Java 8 Lambda expressions also help in making the code concise. The one issue that you would see here is that the routing logic(which uri to which controller) is mixed up with the actual controller logic and I believe this is being addressed.

Given this RequestHandler, a server can be started up in a standalone java program, using raw RX-Netty this way, this is essentially it, an endpoint will be brought up at port 8080 to handle the requests:

public final class RxNettyExample {

    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();

        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);


This is however the native Rx-netty way, for a cloud-ready micro-service a few things have to happen, the service should register with Eureka and should respond to the healthchecks back from Eureka and should be able to load up properties using Archaius.

So with Karyon2, the startup in a main program looks a little different:


import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import org.bk.samplepong.resource.HealthCheck;

public class SamplePongApp {

    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),

Now it is essentially cloud ready, this version of the program on startup would register cleanly with Eureka and expose a healthcheck endpoint. It additionally exposes a neat set of admin endpoints at port 8077.


I hope this provides a good intro on using Karyon2 to develop Netflix OSS based. The entire sample is available at my github repo here: As a follow up I will show how the same service can be developed using spring-cloud which is the Spring way to create Micro-services.

Thursday, May 21, 2015

Akka samples with scala and Spring

I was looking around recently for Akka samples with Spring and found a starter project which appeared to fit the bill well. The project however utilizes Spring-Scala which is an excellent project, but is no longer maintained. So I wanted to update the sample to use core Spring java libraries instead. So here is an attempt on a fork of this starter project with core Spring instead of Spring-scala. The code is available here.

The project utilizes Akka extensions to hook in Spring based dependency injection into Akka.

Here is what the extension looks like:

package sample

import{ActorSystem, Props, Extension}
import org.springframework.context.ApplicationContext
 * The Extension implementation.
class SpringExtension extends Extension {
  var applicationContext: ApplicationContext = _

   * Used to initialize the Spring application context for the extension.
   * @param applicationContext
  def initialize(applicationContext: ApplicationContext) = {
    this.applicationContext = applicationContext

   * Create a Props for the specified actorBeanName using the
   * SpringActorProducer class.
   * @param actorBeanName  The name of the actor bean to create Props for
   * @return a Props that will create the named actor bean using Spring
  def props(actorBeanName: String): Props =
    Props(classOf[SpringActorProducer], applicationContext, actorBeanName)


object SpringExtension {
  def apply(system : ActorSystem )(implicit ctx: ApplicationContext) :  SpringExtension =  SpringExt(system).initialize(ctx)

So the extension wraps around a Spring application context. The extensions provides a props method which returns an Akka Props configuration object which uses the application context and the name which the actor is registered with Spring to return an instance of the Actor. The following is the SpringActorProducer:

package sample

import{Actor, IndirectActorProducer}
import org.springframework.context.ApplicationContext

class SpringActorProducer(ctx: ApplicationContext, actorBeanName: String) extends IndirectActorProducer {

  override def produce: Actor = ctx.getBean(actorBeanName, classOf[Actor])

  override def actorClass: Class[_ <: Actor] =


Given this base code, how does Spring find the actors, I have used scanning annotations to annotate the actors this way:


import sample.service.CountingService
import sample.SpringExtension._
import org.springframework.stereotype.Component
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Scope
import sample.SpringExtension
import org.springframework.context.ApplicationContext

class CountingCoordinating @Autowired() (implicit ctx: ApplicationContext) extends Actor {

  import sample.messages._

  var counter: Option[ActorRef] = None

  def receive = {
    case COUNT => countingActor() ! COUNT
    case g:GET => countingActor() ! g
  private def countingActor(): ActorRef = {
     if (counter.isEmpty) {
        val countingActorProp = SpringExtension(context.system).props("countingActor")
        counter = Some(context.actorOf(countingActorProp, "counter"))

class CountingActor @Autowired()(countingService: CountingService) extends Actor {

  import sample.messages._

  private var count = 0

  def receive = {
    case COUNT => count = countingService.increment(count)
    case GET(requester: ActorRef) => requester ! RESULT(count)

The CountingService is a simple service that gets injected in by Spring. The following is the main Spring Application configuration where all the wiring takes place:

import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Bean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.ComponentScan

@ComponentScan(Array("sample.service", ""))
class AppConfiguration {

  implicit var ctx: ApplicationContext = _;
   * Actor system singleton for this application.
  def actorSystem() = {
    val system = ActorSystem("AkkaScalaSpring")
    // initialize the application context in the Akka Spring Extension

To make use of this entire set-up in a sample program:

import{ActorRef, ActorSystem}
import sample.SpringExtension._
import scala.concurrent.duration._
import scala.concurrent._
import scala.util._
import sample.messages._
import org.springframework.context.annotation.AnnotationConfigApplicationContext

object Main extends App {
  // create a spring context
  implicit val ctx = new AnnotationConfigApplicationContext(classOf[AppConfiguration])

  import Config._

  // get hold of the actor system
  val system = ctx.getBean(classOf[ActorSystem])

  val inbox = Inbox.create(system)
  val prop = SpringExtension(system).props("countingCoordinatingActor")

  // use the Spring Extension to create props for a named actor bean
  val countingCoordinator = system.actorOf(prop, "counter")

  // tell it to count three times
  inbox.send(countingCoordinator, COUNT)
  inbox.send(countingCoordinator, COUNT)
  inbox.send(countingCoordinator, COUNT)
  inbox.send(countingCoordinator, GET(inbox.getRef()))
  val RESULT(count) = inbox.receive(5.seconds)

  println(s"Got $count")


Monday, May 11, 2015

Docker on Mac OSX with docker-machine and VMWare fusion

If you use Cisco Anyconnect VPN on your Mac OSX machine you would have found your experience with docker using boot2docker not working at times. The basic issue is that the Cisco Anyconnect VPN rewrites the routing rules which map the boot2docker Virtual box network interfaces.

The fix that has worked better for me in the last few days has been to not use boot2docker, but instead to use docker-machine to create a VMWare fusion based docker VM.

So to try out this approach first ensure that you have VMWare Fusion installed. Then install docker and docker-machine. Docker and docker-machine command line can be installed using homebrew:

brew install docker
brew install docker-machine

Once the VMWare fusion, docker and docker-machine are in place, then use docker-machine to create a docker host using VMWare fusion this way:

docker-machine create -d vmwarefusion                       

Once the Host is properly created, verify that it is in a running state using docker-machine:

docker-machine ls

That is essentially it, to use this shiny new docker host ensure that the appropriate environment variables are set in the shell:

eval "$(docker-machine env fusion)"

and your docker commands should work with or without VPN connectivity:

docker ps -a

Tuesday, May 5, 2015

Netflix Archaius for property management - Basics

Netflix Archaius provides a neat set of features to load dynamic properties into an application.

This blog post is just a documentation of the extent of Archaius that I have understood, there is much more to it than I have documented here, but this should provide a good start:

Default Behavior

Consider a simple properties file:
listprop=value1, value2, value3
mapprop=key1=value1, key2=value2

If these entries are placed in a file in the classpath, then the following test demonstrates how each of these properties can be resolved by Archaius in code:

public void testBasicStringProps() {
    DynamicStringProperty sampleProp = DynamicPropertyFactory.getInstance().getStringProperty("stringprop", "");
    assertThat(sampleProp.get(), equalTo("propvalue"));

public void testBasicListProps() {
    DynamicStringListProperty listProperty = new DynamicStringListProperty("listprop", Collections.emptyList());
    assertThat(listProperty.get(), contains("value1", "value2", "value3"));

public void testBasicMapProps() {
    DynamicStringMapProperty mapProperty = new DynamicStringMapProperty("mapprop", Collections.emptyMap());
    assertThat(mapProperty.getMap(), allOf(hasEntry("key1", "value1"), hasEntry("key2", "value2")));

public void testBasicLongProperty() {
    DynamicLongProperty longProp = DynamicPropertyFactory.getInstance().getLongProperty("longprop", 1000);
    assertThat(longProp.get(), equalTo(100L));

Loading Properties from a non-default file in classpath

So now, how do we handle a case where the content is to be loaded from a file with a different name, say but still available in the classpath. The following is one way to do that:

public void setUp() throws Exception{

With this change the previous test will just work.

Another option is to provide a system property to indicate the name of the properties file to load from the classpath:

System.setProperty("archaius.configurationSource.defaultFileName", "");

Overriding for environments

Now, how do we override the properties for different application environments - Archaius provides a neat feature where a base property file can be loaded up but then overridden based on the context. More details are here. To demonstrate this consider two files, one containing the defaults and one containing overrides for a "test" environment:

See the notation at the end of the default file @next=sample-${@environment}.properties, it is a way to indicate to Archaius that more properties need to be loaded up based on the resolved @environment parameter. This parameter can be injected in a couple of ways and the following test demonstrates this:

public void setUp() throws Exception{
    ConfigurationManager.getConfigInstance().setProperty("@environment", "test");

public void testBasicStringPropsInTestEnvironment() throws Exception {
    DynamicStringProperty sampleProp = DynamicPropertyFactory.getInstance().getStringProperty("sampleprop", "");
    assertThat(sampleProp.get(), equalTo("propvalue-test"));

The base property file itself now has to be loaded in through a call to ConfigurationManager.loadCascadedPropertiesFromResources..


These are essentially the basics of Netflix Archaius, there is much more to it of course which can be gleaned from the wiki on the Archaius github site. If you are interested in exploring the samples shown here a little more, they are available in this github project