Tuesday, July 28, 2015

Spring Boot @ConfigurationProperties

Spring Boot provides a very neat way to load properties for an application. Consider a set of properties described using yaml format:

prefix:
    stringProp1: propValue1
    stringProp2: propValue2
    intProp1: 10
    listProp:
        - listValue1
        - listValue2
    mapProp:
        key1: mapValue1
        key2: mapValue2

These entries can also be described in a traditional application.properties file the following way:

prefix.stringProp1=propValue1
prefix.stringProp2=propValue2
prefix.intProp1=10
prefix.listProp[0]=listValue1
prefix.listProp[1]=listValue2
prefix.mapProp.key1=mapValue1
prefix.mapProp.key2=mapValue2

It has taken me a little while, but I do like the hierarchical look of the properties described in a yaml format.

So now, given this property file a traditional Spring application would have loaded up the properties the following way:

public class SamplePropertyLoadingTest {
    @Value("${prefix.stringProp1}")
    private String stringProp1;

Note the placeholder for "prefix.stringProp" key.

This however is not ideal for loading a family of related properties, say in this specific case namespaced by the prefix conveniently named "prefix".

The approach Spring boot takes is to define a bean that can hold all the family of related properties this way:

@ConfigurationProperties(prefix = "prefix")
@Component
public class SampleProperty {
    private String stringProp1;
    private String stringProp2;
    @Max(99)
    @Min(0)
    private Integer intProp1;
    private List<String> listProp;
    private Map<String, String> mapProp;
    
    ...
}

At runtime, all the fields would be bound to the related properties cleanly.

Additionally note the JSR-303 annotations on top of the "intProp1" field that validates that value of the field is between 0 and 99, @ConfigurationProperties will call the validator to ensure that bound bean is validated.

An integration test making use of this is the following:

package prop;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SampleWebApplication.class)
public class SamplePropertyLoadingTest {
    @Autowired
    private SampleProperty sampleProperty;

    @Value("${prefix.stringProp1}")
    private String stringProp1;

    @Test
    public void testLoadingOfProperties() {
        System.out.println("stringProp1 = " + stringProp1);
        assertThat(sampleProperty.getStringProp1(), equalTo("propValue1"));
        assertThat(sampleProperty.getStringProp2(), equalTo("propValue2"));
        assertThat(sampleProperty.getIntProp1(), equalTo(10));
        assertThat(sampleProperty.getListProp(), hasItems("listValue1", "listValue2"));
        assertThat(sampleProperty.getMapProp(), allOf(hasEntry("key1", "mapValue1"),
                hasEntry("key2", "mapValue2")));
    }
}

If you are interested in exploring this sample further, I have a github repo with the code checked in here.

Thursday, July 23, 2015

Scatter Gather - Using Java 8 CompletableFuture and Rx-Java Observable

I wanted to explore a simple scatter-gather scenario using Java 8 CompletableFuture and using Rx-Java Observable.


The scenario is simple - Spawn about 10 tasks, each returning a string, and ultimately collect the results into a list.

Sequential

A sequential version of this would be the following:

public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

 logger.info(list.toString());
}

private String generateTask(int i) {
 Util.delay(2000);
 return i + "-" + "test";
}

With CompletableFuture

A method can be made to return a CompletableFuture using a utility method called supplyAsync, I am using a variation of this method which accepts an explicit Executor to use, also I am deliberately throwing an exception for one of the inputs:

private CompletableFuture<String> generateTask(int i,
  ExecutorService executorService) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(2000);
  if (i == 5) {
   throw new RuntimeException("Run, it is a 5!");
  }
  return i + "-" + "test";
 }, executorService);
}

Now to scatter the tasks:

List<CompletableFuture<String>> futures =
  IntStream.range(0, 10)
    .boxed()
    .map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage()))
    .collect(Collectors.toList());

At the end of scattering the tasks the result is a list of CompletableFuture. Now, to obtain the list of String from this is a little tricky, here I am using one of the solutions suggested in Stackoverflow:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
  .thenApply(v -> futures.stream()
       .map(CompletableFuture::join)
       .collect(Collectors.toList()));

CompletableFuture.allOf method is being used here purely to compose the next action to take once all the scattered tasks are completed, once the tasks are completed the futures are again streamed and collected into a list of string.

The final result can then be presented asynchronously:
result.thenAccept(l -> {
 logger.info(l.toString());
});


With Rx-java Observable

Scatter gather with Rx-java is relatively cleaner than the CompletableFuture version as Rx-java provides better ways to compose the results together, again the method which performs the scattered task:

private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                if ( i == 5) {
                    throw new RuntimeException("Run, it is a 5!");
                }
                s.onNext( i + "-test");
                s.onCompleted();
            }).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

and to scatter the tasks:

List<Observable<String>> obs =
        IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(i, executors)).collect(Collectors.toList());

Once more I have a List of Observable's, and what I need is a List of results, Observable provides a merge method to do just that:

Observable<List<String>> merged = Observable.merge(obs).toList();

which can be subscribed to and the results printed when available:

merged.subscribe(
                l -> logger.info(l.toString()));

Friday, July 10, 2015

Rx-netty and Karyon2 based cloud ready microservice - Dependency Injection

I had previously written about using Rx-netty and Karyon2 for developing cloud ready microservices, there were a few issues with the sample though, partly reproduced here:

package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

The issues are:

  1. The routing logic is not centralized, the request handler has both the routing logic and the processing logic
  2. The dependencies are not injected in cleanly.


Looking at the Karyon2 samples, both of these issues are actually very cleanly addressed now which I wanted to document here.

Routing

Routing can be centralized using a custom Rx-netty RequestHandler called the SimpleUriRouter
The routes can be registered the following way using SimpleRouter which is being created here using a Guice Provider:

import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;

public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {

    @Inject
    private HealthCheck healthCheck;

    @Inject
    private ApplicationMessageHandler applicationMessageHandler;

    @Override
    public SimpleUriRouter get() {
        SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
        simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
        simpleUriRouter.addUri("/message", applicationMessageHandler);
        return simpleUriRouter;
    }
}

This router can now be registered via a custom guice module the following way:

public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {

    public KaryonAppModule() {
        super("routerModule", ByteBuf.class, ByteBuf.class);
    }

    @Override
    protected void configureServer() {
        bindRouter().toProvider(new AppRouteProvider());

        interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);

        server().port(8888);
    }
}

This is essentially it, now the routing logic is cleanly separated from the processing logic.

Dependency Injection


Dependency injection is handled via custom guice modules. I have a service, call it the MessageHandlerService, which takes in a message and returns an Acknowledgement, this service is defined as follows:

public class MessageHandlerServiceImpl implements MessageHandlerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);

    public Observable<MessageAcknowledgement> handleMessage(Message message) {
        return Observable.<MessageAcknowledgement>create(s -> {
            s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
            s.onCompleted();
        });
    }


}

Now, I have a guice module which specifies the binding between MessageHandlerService interface and the concrete MessageHandlerServiceImpl:

public class AppModule extends AbstractModule {


    @Override
    protected void configure() {
        bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
    }
}


With this in place, the MessageHandlerService can be injected in:

public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final MessageHandlerService messageHandlerService;

    @Inject
    public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
        this.messageHandlerService = messageHandlerService;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -> {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .flatMap(messageHandlerService::handleMessage)
                .flatMap(ack -> {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    }
}


With both these features implemented, the app using Karyon2 is also greatly simplified and I have the complete working app in my github repository here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong

Monday, June 29, 2015

Learning Spring-Cloud - Writing a microservice

Continuing my Spring-Cloud learning journey, earlier I had covered how to write the infrastructure components of a typical Spring-Cloud and Netflix OSS based micro-services environment - in this specific instance two critical components, Eureka to register and discover services and Spring Cloud Configuration to maintain a centralized repository of configuration for a service. Here I will be showing how I developed two dummy micro-services, one a simple "pong" service and a "ping" service which uses the "pong" service.


Sample-Pong microservice


The endpoint handling the "ping" requests is a typical Spring MVC based endpoint:

@RestController
public class PongController {

    @Value("${reply.message}")
    private String message;

    @RequestMapping(value = "/message", method = RequestMethod.POST)
    public Resource<MessageAcknowledgement> pongMessage(@RequestBody Message input) {
        return new Resource<>(
                new MessageAcknowledgement(input.getId(), input.getPayload(), message));
    }

}

It gets a message and responds with an acknowledgement. Here the service utilizes the Configuration server in sourcing the "reply.message" property. So how does the "pong" service find the configuration server, there are potentially two ways - directly by specifying the location of the configuration server, or by finding the Configuration server via Eureka. I am used to an approach where Eureka is considered a source of truth, so in this spirit I am using Eureka to find the Configuration server. Spring Cloud makes this entire flow very simple, all it requires is a "bootstrap.yml" property file with entries along these lines:

---
spring:
  application:
    name: sample-pong
  cloud:
    config:
      discovery:
        enabled: true
        serviceId: SAMPLE-CONFIG

eureka:
  instance:
    nonSecurePort: ${server.port:8082}
  client:
    serviceUrl:
      defaultZone: http://${eureka.host:localhost}:${eureka.port:8761}/eureka/

The location of Eureka is specified through the "eureka.client.serviceUrl" property and the "spring.cloud.config.discovery.enabled" is set to "true" to specify that the configuration server is discovered via the specified Eureka server.

Just a note, this means that the Eureka and the Configuration server have to be completely up before trying to bring up the actual services, they are the pre-requisites and the underlying assumption is that the Infrastructure components are available at the application boot time.

The Configuration server has the properties for the "sample-pong" service, this can be validated by using the Config-servers endpoint - http://localhost:8888/sample-pong/default, 8888 is the port where I had specified for the server endpoint, and should respond with a content along these lines:

"name": "sample-pong",
  "profiles": [
    "default"
  ],
  "label": "master",
  "propertySources": [
    {
      "name": "classpath:/config/sample-pong.yml",
      "source": {
        "reply.message": "Pong"
      }
    }
  ]
}

As can be seen the "reply.message" property from this central configuration server will be used by the pong service as the acknowledgement message

Now to set up this endpoint as a service, all that is required is a Spring-boot based entry point along these lines:

@SpringBootApplication
@EnableDiscoveryClient
public class PongApplication {
    public static void main(String[] args) {
        SpringApplication.run(PongApplication.class, args);
    }
}

and that completes the code for the "pong" service.


Sample-ping micro-service


So now onto a consumer of the "pong" micro-service, very imaginatively named the "ping" micro-service. Spring-Cloud and Netflix OSS offer a lot of options to invoke endpoints on Eureka registered services, to summarize the options that I had:

1. Use raw Eureka DiscoveryClient to find the instances hosting a service and make calls using Spring's RestTemplate.

2. Use Ribbon, a client side load balancing solution which can use Eureka to find service instances

3. Use Feign, which provides a declarative way to invoke a service call. It internally uses Ribbon.

I went with Feign. All that is required is an interface which shows the contract to invoke the service:

package org.bk.consumer.feign;

import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

@FeignClient("samplepong")
public interface PongClient {

    @RequestMapping(method = RequestMethod.POST, value = "/message",
            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    MessageAcknowledgement sendMessage(@RequestBody Message message);
}

The annotation @FeignClient("samplepong") internally points to a Ribbon "named" client called "samplepong". This means that there has to be an entry in the property files for this named client, in my case I have these entries in my application.yml file:

samplepong:
  ribbon:
    DeploymentContextBasedVipAddresses: sample-pong
    NIWSServerListClassName: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
    ReadTimeout: 5000
    MaxAutoRetries: 2

The most important entry here is the "samplepong.ribbon.DeploymentContextBasedVipAddresses" which points to the "pong" services Eureka registration address using which the service instance will be discovered by Ribbon.

The rest of the application is a routine Spring Boot application. I have exposed this service call behind Hystrix which guards against service call failures and essentially wraps around this FeignClient:

package org.bk.consumer.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.bk.consumer.domain.Message;
import org.bk.consumer.domain.MessageAcknowledgement;
import org.bk.consumer.feign.PongClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service("hystrixPongClient")
public class HystrixWrappedPongClient implements PongClient {

    @Autowired
    @Qualifier("pongClient")
    private PongClient feignPongClient;

    @Override
    @HystrixCommand(fallbackMethod = "fallBackCall")
    public MessageAcknowledgement sendMessage(Message message) {
        return this.feignPongClient.sendMessage(message);
    }

    public MessageAcknowledgement fallBackCall(Message message) {
        MessageAcknowledgement fallback = new MessageAcknowledgement(message.getId(), message.getPayload(), "FAILED SERVICE CALL! - FALLING BACK");
        return fallback;
    }
}


"Boot"ing up


I have dockerized my entire set-up, so the simplest way to start up the set of applications is to first build the docker images for all of the artifacts this way:

mvn clean package docker:build -DskipTests

and bring all of them up using the following command, the assumption being that both docker and docker-compose are available locally:

docker-compose up

Assuming everything comes up cleanly, Eureka should show all the registered services, at http://dockerhost:8761 url -


The UI of the ping application should be available at http://dockerhost:8080 url -



Additionally a Hystrix dashboard should be available to monitor the requests to the "pong" app at this url http://dockerhost:8989/hystrix/monitor?stream=http%3A%2F%2Fsampleping%3A8080%2Fhystrix.stream:



References


1. The code is available at my github location - https://github.com/bijukunjummen/spring-cloud-ping-pong-sample

2. Most of the code is heavily borrowed from the spring-cloud-samples repository - https://github.com/spring-cloud-samples

Tuesday, June 23, 2015

Rx-java subscribeOn and observeOn

If you have been confused by Rx-java Observable subscribeOn and observeOn, one of the blog articles that helped me understand these operations is this one by Graham Lea. I wanted to recreate a very small part of the article here, so consider a service which emits values every 200 millseconds:



package obs.threads;

import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class GeneralService {
    private static final Logger logger = LoggerFactory.getLogger(GeneralService.class);
    public Observable<String> getData() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing a Service");
            for (int i = 1; i <= 3; i++) {
                Util.delay(200);
                logger.info("Emitting {}", "root " + i);
                s.onNext("root " + i);
            }
            logger.info("End: Executing a Service");
            s.onCompleted();
        });
    }
}

Now, if I were to subscribe to this service, this way:

@Test
public void testThreadedObservable1() throws Exception {
    Observable<String> ob1 = aService.getData();

    CountDownLatch latch = new CountDownLatch(1);

    ob1.subscribe(s -> {
        Util.delay(500);
        logger.info("Got {}", s);
    }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

    latch.await();
}

All of the emissions and subscriptions will act on the main thread and something along the following lines will be printed:

20:53:29.380 [main] INFO  o.t.GeneralService - Start: Executing a Service
20:53:29.587 [main] INFO  o.t.GeneralService - Emitting root 1
20:53:30.093 [main] INFO  o.t.ThreadedObsTest - Got root 1
20:53:30.298 [main] INFO  o.t.GeneralService - Emitting root 2
20:53:30.800 [main] INFO  o.t.ThreadedObsTest - Got root 2
20:53:31.002 [main] INFO  o.t.GeneralService - Emitting root 3
20:53:31.507 [main] INFO  o.t.ThreadedObsTest - Got root 3
20:53:31.507 [main] INFO  o.t.GeneralService - End: Executing a Service

By default the emissions are not asynchronous in nature. So now, what is the behavior if subscribeOn is used:

public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testSubscribeOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.subscribeOn(Schedulers.from(executor1)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

Here I am using Guava's ThreadFactoryBuilder to give each thread in the threadpool a unique name pattern, if I were to execute this code, the output will be along these lines:

20:56:47.117 [SubscribeOn-0] INFO  o.t.GeneralService - Start: Executing a Service
20:56:47.322 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 1
20:56:47.828 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 1
20:56:48.032 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 2
20:56:48.535 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 2
20:56:48.740 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.GeneralService - End: Executing a Service

Now, the execution has moved away from the main thread and the emissions and the subscriptions are being processed in the threads borrowed from the threadpool.

And what happens if observeOn is used:
public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testObserveOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.observeOn(Schedulers.from(executor2)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

the output is along these lines:

21:03:08.655 [main] INFO  o.t.GeneralService - Start: Executing a Service
21:03:08.860 [main] INFO  o.t.GeneralService - Emitting root 1
21:03:09.067 [main] INFO  o.t.GeneralService - Emitting root 2
21:03:09.268 [main] INFO  o.t.GeneralService - Emitting root 3
21:03:09.269 [main] INFO  o.t.GeneralService - End: Executing a Service
21:03:09.366 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 1
21:03:09.872 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 2
21:03:10.376 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 3

The emissions are now back on the main thread but the subscriptions are being processed in a threadpool.

That is the difference, when subscribeOn is used the emissions are performed on the specified Scheduler, when observeOn is used the subscriptions are performed are on the specified scheduler!

And the output when both are specified is equally predictable. Now in all cases I had created a Scheduler using a ThreadPool with 5 threads but only 1 of the threads has really been used both for emitting values and for processing subscriptions, this is actually the normal behavior of Observables. If you want to make more efficient use of the Threadpool, one approach may be to create multiple Observable's, say for eg, if I have a service which returns pages of data this way:

public Observable<Integer> getPages(int totalPages) {
    return Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            logger.info("Getting pages");
            for (int i = 1; i <= totalPages; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
}

and another service which acts on each page of the data:

public Observable<String> actOnAPage(int pageNum) {
    return Observable.<String>create(s -> {
        Util.delay(200);
        logger.info("Acting on page {}",  pageNum);
        s.onNext("Page " + pageNum);
        s.onCompleted();
    });
}

a way to use a Threadpool to process each page of data would be to chain it this way:

getPages(5).flatMap(  page -> aService.actOnAPage(page).subscribeOn(Schedulers.from(executor1)) )
                .subscribe(s -> {
                    logger.info("Completed Processing page: {}", s);
        });

see how the subscribeOn is on the each Observable acting on a page. With this change, the output would look like this:

21:15:45.572 [main] INFO  o.t.ThreadedObsTest - Getting pages
21:15:45.787 [SubscribeOn-1] INFO  o.t.GeneralService - Acting on page 2
21:15:45.787 [SubscribeOn-0] INFO  o.t.GeneralService - Acting on page 1
21:15:45.787 [SubscribeOn-4] INFO  o.t.GeneralService - Acting on page 5
21:15:45.787 [SubscribeOn-3] INFO  o.t.GeneralService - Acting on page 4
21:15:45.787 [SubscribeOn-2] INFO  o.t.GeneralService - Acting on page 3
21:15:45.789 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 2
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 1
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 3
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 4
21:15:45.791 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 5

Now the threads in the threadpool are being used uniformly.

Saturday, June 13, 2015

Learning Spring-Cloud - Infrastructure and Configuration

I got a chance to play with Spring-Cloud to create a sample set of cloud ready microservices and I am very impressed by how Spring-Cloud enables different infrastructure components and services to work together nicely.

I am used to creating microservices based on Netflix OSS based stack and typically in a Netflix stack Eureka is considered the hub using which the microservices register themselves and discover each other. In the spirit of this model, I wanted to try out a series of services which look like this:




There are 2 microservices here:


  • A sample-pong service which responds to "ping" messages
  • A sample-ping service which uses the "pong" micro-service


And there are two infrastructure components:

  • Sample-config which provides a centralized configuration for the 2 microservices
  • Eureka which is the central hub providing a way for the services to register themselves and discover other services

So to start with, here I will introduce how I went about using spring-cloud to develop the two infrastructure components and follow it up with how the microservices can be developed to use these components.
The entire project is available at my github location.


Eureka

Spring-cloud makes it very simple to bring up an instance of Eureka, all that is required is a class along the following lines:

package org.bk.eureka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }
}

Multiple instances of Eureka can be started up and can be configured to work together in a resilient way, here though I just want a demo standalone Eureka instance and this can be done using a configuration which looks like this, essentially starting up eureka on port 8761 and in a standalone mode by not trying to look for peers:

---
# application.yml
server:
  port: 8761

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false


Configuration Server

Spring-Cloud provides a centralized configuration server that microservices can use for loading up their properties. Typically microservices may want to go one of two ways:


  1. Use Eureka as a hub and find the configuration services
  2. Use Configuration services and find Eureka

I personally prefer the Eureka first approach, in this sample Configuration server registers itself with Eureka and when microservices come up they first check with Eureka, find the Configuration service and use the service to load up their properties.

The configuration server is simple to write using Spring-cloud too, the following is all the code that is required:

package org.bk.configserver;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableConfigServer
@EnableEurekaClient
public class ConfigServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConfigServerApplication.class, args);
    }
}

and the configuration that registers this service with Eureka:

---
# bootstrap.yml
spring:
  application:
    name: sample-config
  profiles:
    active: native

eureka:
  instance:
    nonSecurePort: ${server.port:8888}
  client:
    serviceUrl:
      defaultZone: http://${eureka.host:localhost}:${eureka.port:8761}/eureka/


---
# application.yml
spring:
  cloud:
    config:
      server:
        native:
          searchLocations: classpath:/config

server:
  port: 8888

The configuration server is being started at port 8888, and provides configuration from the classpath. In a real application, the configuration can be set to load from a central git repository, this way providing a clean way to version properties and the ability to centrally manage the properties. In this specific case, since it provides properties for two microservices, there are two sets of files in the classpath and provide appropriate properties to the calling application:

---
#sample-pong.yml
reply:
  message: Pong

---
# sample-ping.yml
send:
  message: Ping


Starting up Eureka and Configuration Server

Since both these applications are Spring-boot based, they can each be started up by running the following command:

mvn spring-boot:run

Once Eureka and Configuration server come up cleanly., Eureka provides a nice interface with details of the services registered with it, in this case the Configuration server shows up with a name of "SAMPLE-CONFIG":


The config server provides properties to the calling applications through endpoints with the pattern:
/{application}/{profile}[/{label}]

So to retrieve the properties for "sample-pong" application, the following url is used internally by the application:

http://localhost:8888/sample-pong/default

and for the "sample-ping" application the properties can be derived from http://localhost:8888/sample-ping/default


This concludes the details around bringing up the Infrastructure components of a Cloud ready system.  I will follow it up with how the microservices can be developed that make use of these infrastructure components. The code behind these samples are available at my github repository.

Saturday, May 30, 2015

Rx-netty and Karyon2 based cloud ready microservice

Netflix Karyon provides a clean framework for creating cloud-ready micro-services. In your organization if you use the Netflix OSS stack consisting of Eureka for service registration and discovery, Archaius for property management, then very likely you use Karyon to create your microservices.

Karyon has been undergoing quite a lot of changes recently and my objective here is to document a good sample using the newer version of Karyon. The old Karyon(call it Karyon1) was based on JAX-RS 1.0 Specs with Jersey as the implementation, the newer version of Karyon(Karyon2) still supports Jersey but also encourages the use of RX-Netty which is a customized version of Netty with support for Rx-java.

With that said, let me jump into a sample. My objective with this sample is to create a "pong" micro-service which takes a "POST"ed "message" and returns an "Acknowledgement"

The following is a sample request:

{
"id": "id",
"payload":"Ping"
}

And an expected response:

{"id":"id","received":"Ping","payload":"Pong"}


The first step is to create a RequestHandler which as the name suggests is an RX-Netty component dealing with routing the incoming request:

package org.bk.samplepong.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;

import java.io.IOException;
import java.nio.charset.Charset;


public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

This flow is completely asynchronous and internally managed by the RX-java libraries, Java 8 Lambda expressions also help in making the code concise. The one issue that you would see here is that the routing logic(which uri to which controller) is mixed up with the actual controller logic and I believe this is being addressed.

Given this RequestHandler, a server can be started up in a standalone java program, using raw RX-Netty this way, this is essentially it, an endpoint will be brought up at port 8080 to handle the requests:

public final class RxNettyExample {

    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();

        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);

        server.start();


This is however the native Rx-netty way, for a cloud-ready micro-service a few things have to happen, the service should register with Eureka and should respond to the healthchecks back from Eureka and should be able to load up properties using Archaius.

So with Karyon2, the startup in a main program looks a little different:

package org.bk.samplepong.app;

import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.resource.HealthCheck;

public class SamplePongApp {

    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
        Karyon.forRequestHandler(8888,
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),
                KaryonEurekaModule.asBootstrapModule(),
                Karyon.toBootstrapModule(KaryonWebAdminModule.class),
                ShutdownModule.asBootstrapModule(),
                KaryonServoModule.asBootstrapModule()
        ).startAndWaitTillShutdown();
    }
}

Now it is essentially cloud ready, this version of the program on startup would register cleanly with Eureka and expose a healthcheck endpoint. It additionally exposes a neat set of admin endpoints at port 8077.


Conclusion

I hope this provides a good intro on using Karyon2 to develop Netflix OSS based. The entire sample is available at my github repo here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong. As a follow up I will show how the same service can be developed using spring-cloud which is the Spring way to create Micro-services.