Saturday, May 30, 2015

Rx-netty and Karyon2 based cloud ready microservice

Netflix Karyon provides a clean framework for creating cloud-ready micro-services. In your organization if you use the Netflix OSS stack consisting of Eureka for service registration and discovery, Archaius for property management, then very likely you use Karyon to create your microservices.

Karyon has been undergoing quite a lot of changes recently and my objective here is to document a good sample using the newer version of Karyon. The old Karyon(call it Karyon1) was based on JAX-RS 1.0 Specs with Jersey as the implementation, the newer version of Karyon(Karyon2) still supports Jersey but also encourages the use of RX-Netty which is a customized version of Netty with support for Rx-java.

With that said, let me jump into a sample. My objective with this sample is to create a "pong" micro-service which takes a "POST"ed "message" and returns an "Acknowledgement"

The following is a sample request:

"id": "id",

And an expected response:


The first step is to create a RequestHandler which as the name suggests is an RX-Netty component dealing with routing the incoming request:


import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;

import java.nio.charset.Charset;

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;

    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    return response.close();
        } else {
            return response.close();

This flow is completely asynchronous and internally managed by the RX-java libraries, Java 8 Lambda expressions also help in making the code concise. The one issue that you would see here is that the routing logic(which uri to which controller) is mixed up with the actual controller logic and I believe this is being addressed.

Given this RequestHandler, a server can be started up in a standalone java program, using raw RX-Netty this way, this is essentially it, an endpoint will be brought up at port 8080 to handle the requests:

public final class RxNettyExample {

    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();

        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);


This is however the native Rx-netty way, for a cloud-ready micro-service a few things have to happen, the service should register with Eureka and should respond to the healthchecks back from Eureka and should be able to load up properties using Archaius.

So with Karyon2, the startup in a main program looks a little different:


import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import org.bk.samplepong.resource.HealthCheck;

public class SamplePongApp {

    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),

Now it is essentially cloud ready, this version of the program on startup would register cleanly with Eureka and expose a healthcheck endpoint. It additionally exposes a neat set of admin endpoints at port 8077.


I hope this provides a good intro on using Karyon2 to develop Netflix OSS based. The entire sample is available at my github repo here: As a follow up I will show how the same service can be developed using spring-cloud which is the Spring way to create Micro-services.

No comments:

Post a Comment