Wednesday, August 24, 2016

Integrating with RabbitMQ using Spring Cloud Stream

In my previous post I wrote about a very simple integration scenario between two systems - one generating a work unit and another processing that work unit and how Spring Integration makes such integration very easy.



Here I will demonstrate how this integration scenario can be simplified even further using Spring Cloud Stream

I have the sample code available here - the right maven dependencies for Spring Cloud Stream is available in the pom.xml.

Producer


So again starting with the producer responsible for generating the work units. All that needs to be done code wise to send messages to RabbitMQ is to have a java configuration along these lines:

@Configuration
@EnableBinding(WorkUnitsSource.class)
@IntegrationComponentScan
public class IntegrationConfiguration {}

This looks deceptively simple but does a lot under the covers, from what I can understand and glean from the documentation these are what this configuration triggers:

1. Spring Integration message channels based on the classes that are bound to the @EnableBinding annotation are created. The WorkUnitsSource class above is the definition of a custom channel called "worksChannel" and looks like this:

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface WorkUnitsSource {

    String CHANNEL_NAME = "worksChannel";

    @Output
    MessageChannel worksChannel();

}

2. Based on which "binder" implementation is available at runtime(say RabbitMQ, Kaffka, Redis, Gemfire), the channel in the previous step will be connected to the appropriate structures in the system - so for eg, I am want my "worksChannel" to in turn send messages to RabbitMQ, Spring Cloud Stream would take care of automatically creating a topic exchange in RabbitMQ

I wanted some further customizations in terms of how the data is sent to RabbitMQ - specifically I wanted my domain objects to be serialized to json before being sent across and I want to specify the name of the RabbitMQ exchange that the payload is sent to, this is controlled by certain configurations that can be attached to the channel the following way using a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          contentType: application/json
          group: testgroup

One final detail is a way for the rest of the application to interact with Spring Cloud Stream, this can be done directly in Spring Integration by defining a message gateway:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import works.service.domain.WorkUnit;

@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)
 void generate(WorkUnit workUnit);

}

That is essentially it, Spring Cloud Stream would now wire up the entire Spring integration flow, create the appropriate structures in RabbitMQ.


Consumer


Similar to the Producer, first I want to define the channel called "worksChannel" which would handle the incoming message from RabbitMQ:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface WorkUnitsSink {
    String CHANNEL_NAME = "worksChannel";

    @Input
    SubscribableChannel worksChannel();
}

and let Spring Cloud Stream create the channels and RabbitMQ bindings based on this definition:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBinding(WorkUnitsSink.class)
public class IntegrationConfiguration {}

To process the messages, Spring Cloud Stream provides a listener which can be created the following way:

@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);

    @StreamListener(WorkUnitsSink.CHANNEL_NAME)
    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}

And finally the configuration which connects this channel to the RabbitMQ infrastructure expressed in a yaml file:

spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          group: testgroup


Now if the producer and any number of consumers were started up, the message sent via the producer would be sent to a Rabbit MQ topic exchange as a json, retrieved by the consumer, deserialized to an object and passed to the work processor.

A good amount of the boiler plate involved in creating the RabbitMQ infrastructure is now handled purely by convention by the Spring Cloud Stream libraries. Though Spring Cloud Stream attempts to provide a facade over the raw Spring Integration, it is useful to have a basic knowledge of Spring integration to use Spring Cloud Stream effectively.

The sample described here is available at my github repository

Monday, August 22, 2016

Integrating with Rabbit MQ using Spring Integration Java DSL

I recently attended the Spring One conference 2016 in Las Vegas and had the good fortune to see from near and far some of the people that I have admired for a long time in the Software World. I personally met two of them who have actually merged some of my Spring Integration related minor contributions from a few years ago - Gary Russel and Artem Bilan and they inspired me to look again at Spring Integration which I have not used for a while.

I was once more reminded of how Spring Integration makes any complex Enterprise integration scenario look easy. I am happy to see that Spring Integration Java based DSL is now fully integrated into the Spring Integration umbrella and higher level abstractions like Spring Cloud Stream(introductions thanks to my good friend and a contributor to this project Soby Chacko) which makes some of the message driven scenarios even easier.

In this post I am just revisiting a very simple integration scenario with RabbitMQ and in a later post will re-implement it using Spring Cloud Stream.

Consider a scenario where two services are talking to each other via a RabbitMQ broker in between, one of them generating some kind of a work, the other processing this work.



Producer


The Work unit producing/dispatching part can be expressed in code using Spring Integration Java DSL the following way:


@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
                .get();
    }
}

This is eminently readable - the flow starts by reading a message off a channel called "worksChannel", transforms the message into a json and dispatches it off using an Outbound channel adapter to a RabbitMQ exchange. Now, how does the message get to the channel called "worksChannel" - I have configured it via a Messaging gateway, an entry point to the Spring Integration world -

@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = "worksChannel")
 void generate(WorkUnit workUnit);

}

So now if a java client wanted to dispatch a "work unit" to rabbitmq, the call would look like this :

WorkUnit sampleWorkUnit = new WorkUnit(UUID.randomUUID().toString(), definition);
workUnitGateway.generate(sampleWorkUnit);

I have brushed over a few things here - specifically the Rabbit MQ configuration, that is run of the mill however and is available here

Consumer

Along the lines of a producer, a consumers flow would start by receiving a message from RabbitMQ queue, transforming it to a domain model and then processing the message, expressed using Spring Integration Java DSL the following way:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(connectionFactory, rabbitConfig.worksQueue()).concurrentConsumers(3))
                .transform(Transformers.fromJson(WorkUnit.class))
                .handle("workHandler", "process")
                .get();
    }
}

The code should be intuitive, the workHandler above is a simple Java pojo and looks like this, doing the very important job of just logging the payload:

@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);

    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}


That is essentially it, Spring Integration provides an awesome facade to what would have been a fairly complicated code had it been attempted using straight Java and raw RabbitMQ libraries. Spring Cloud Stream makes this entire set-up even simpler and would be the topic of a future post.


I have posted this entire code at my github repo if you are interested in taking this for a spin.

Saturday, August 6, 2016

No downtime deployment using "Yet another" Cloud Foundry Gradle plugin

I have been trying my hand at writing a gradle plugin for deploying applications to Cloud Foundry and wrote about this plugin in my previous post. I have now enhanced this plugin with support for no-downtime deploys into Cloud Foundry using two approaches - an Autopilot style deployment and a more commonly used Blue-Green style deployment.

To jump into the meat of the plugin, once it is configured cleanly all you have to do is the following:

For an autopilot style

./gradlew cf-push-autopilot

and for a Blue-Green deployment:

./gradlew cf-push-blue-green

and the plugin tasks would take care of the rest.

What is being solved


If you use Cloud Foundry CLI to push an application to Cloud Foundry, then existing instances of the application is stopped, replaced and started up. This introduces a downtime for the application until the new instance of the application is up. Just to demonstrate this behavior, the following graph represents a steady traffic to a website while an application is pushed to Cloud Foundry - the 30 second blip is when the new app is being started up.


Autopilot and Blue-Green style deployments


Autopilot and Blue-Green styles of deployment fix the issue by carefully orchestrating the deployment of an application such that the external facing route always points to a working version of the application.

The plugin now natively performs all the steps needed for these two styles of no-downtime deployments.

Here is how the same graph looks with an Autopilot style type deployment using the plugin, note that there is a slightly higher response time around the time the new application switches in. Once primed though the response times smooth out:



and with a Blue-Green style deployment using this plugin


References:


1. The details about how to install and configure the plugin is available here - https://github.com/pivotalservices/ya-cf-app-gradle-plugin

2. A sample application configured with the plugin is here - https://github.com/bijukunjummen/cf-show-env

3. The load test using gatling is available here

Tuesday, July 19, 2016

Introducing "Yet another" Cloud foundry Gradle plugin

In the process of working on an automated Jenkins pipeline for deploying a Cloud Foundry application with two of my colleagues(Thanks Mark Alston, Dave Malone !) I decided to try my hand on writing a Gradle plugin to perform some of the tasks that are typically done using a command line Cloud Foundry Client.

Introducing the totally unimaginatively named "ya-cf-app-gradle-plugin" with a set of gradle tasks(dare I say opinionated!) that should help automate some of the routine steps involved in deploying a java application to a Cloud Foundry environment. The "ya" or the yet-another part is because this is just a stand-in plugin, the authoritative plugin for Cloud Foundry will ultimately reside with the excellent CF-Java-Client project.


I have provided an extensive README with the projects documentation that should help in getting started with using the plugin, the tasks should be fairly intuitive if you have previously worked with the CF cli.

Just as an example, once the gradle plugin is cleanly added into the build script, the following gradle tasks are available when listed by running "./gradlew tasks" command:





All the tasks work off a configuration provided the following way in a cfConfig block in the buildscript:

apply plugin: 'cf-app'

cfConfig {
 //CF Details
 ccHost = "api.local.pcfdev.io"
 ccUser = "admin"
 ccPassword = "admin"
 org = "pcfdev-org"
 space = "pcfdev-space"

 //App Details
 name = "cf-show-env"
 hostName = "cf-show-env"
 filePath = "build/libs/cf-show-env-0.1.2-SNAPSHOT.jar"
 path = ""
 domain = "local.pcfdev.io"
 instances = 2
 memory = 512

 //Env and services
 buildpack = "https://github.com/cloudfoundry/java-buildpack.git"
 environment = ["JAVA_OPTS": "-Djava.security.egd=file:/dev/./urandom", "SPRING_PROFILES_ACTIVE": "cloud"]
 services  = ["mydb"]
}

Any overrides on top of the base configuration provided this way can be done by specifying gradle properties with a "cf.*" pattern. For eg. a normal push of an application would look like this:

./gradlew cf-push

and a push with the name of the application and the host name overridden would look like this:

./gradlew cf-push -Pcf.name=Green -Pcf.hostName=demo-time-temp


All of the tasks follow the exact same pattern, depending on the cfConfig block as the authoritative source of properties along with the command line overrides. There is one task that can be used for retrieving back some of the details of an app in CloudFoundry, the task is "cf-get-app-detail", say after deploying a canary instance of an app you wanted to run a quick test against it, the task would look along these lines, a structure "project.cfConfig" is populated with the app details once successfully invoked:

task acceptanceTest(type: Test, dependsOn: "cf-get-app-detail")  {
 doFirst() {
  systemProperty "url", "https://${project.cfConfig.applicationDetail.urls[0]}"
 }
 useJUnit {
  includeCategories 'test.AcceptanceTest'
 }
}


References:


1. The plugin is built on top of the excellent CF-Java-Client project
2. I have borrowed a lot of ideas from gradle-cf-plugin but is more or less a clean room implementation
3. Here is a sample project which makes use of the plugin.

Tuesday, July 5, 2016

Spring Cloud Zuul - Writing a Filter

Netflix OSS project Zuul serves as a gateway to backend services and provides support for adding in edge features like security, routing. In the Zuul world specific edge features are provided by components called the Zuul Filter and writing such a filter for a Spring Cloud based project is very simple. A good reference to adding a filter is here. Here I wanted to demonstrate two small features - deciding whether a filter should act on a request and secondly to add a header before forwarding the request.

Writing a Zuul Filter


Writing a Zuul Filter is very easy for Spring Cloud, all we need to do is to add a Spring bean which implements the ZuulFilter, so for this example it would look something like this:

import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.springframework.stereotype.Service;


@Service
public class PayloadTraceFilter extends ZuulFilter {

    private static final String HEADER="payload.trace";

    @Override
    public String filterType() {
        return "pre";
    }

    @Override
    public int filterOrder() {
        return 999;
    }

    @Override
    public boolean shouldFilter() {
      ....   
    }

    @Override
    public Object run() {
     ....
    }
}

Some high level details of this implementation, this has been marked as a "Filter type" of "pre" which means that this filter would be called before the request is dispatched to the backend service, filterOrder determines when this specific filter is called in the chain of filters, should Filter determines if this filter is invoked at all for this request and run contains the logic for the filter.

So to my first consideration, whether this filter should act on the flow at all - this can be done on a request by request basis, my logic is very simple - if the request uri starts with /samplesvc then this filter should act on the request.

@Override
public boolean shouldFilter() {
    RequestContext ctx = RequestContext.getCurrentContext();
    String requestUri = ctx.getRequest().getRequestURI();
    return requestUri.startsWith("/samplesvc");
}

and the second consideration on modifying the request headers to the backend service:

@Override
public Object run() {
    RequestContext ctx = RequestContext.getCurrentContext();
    ctx.addZuulRequestHeader("payload.trace", "true");
    return null;
}

A backing service getting such a request can look for the header and act accordingly, say in this specific case looking at the "payload.trace" header and deciding to log the incoming message:

@RequestMapping(value = "/message", method = RequestMethod.POST)
public Resource<MessageAcknowledgement> pongMessage(@RequestBody Message input, @RequestHeader("payload.trace") boolean tracePayload) {
    if (tracePayload) {
        LOGGER.info("Received Payload: {}", input.getPayload());
    }
....

Conclusion

As demonstrated here, Spring Cloud really makes it simple to add in Zuul filters for any edge needs. If you want to explore this sample a little further I have sample projects available in my github repo.

Wednesday, June 22, 2016

Spring Cloud Zuul Support - Configuring Timeouts

Spring Cloud provides support for Netflix Zuul - a toolkit for creating edge services with routing and filtering capabilities.

Zuul Proxy support is very comprehensively documented at the Spring Cloud site. My objective here is to focus on a small set of attributes relating to handling timeouts when dealing with the proxied services.

Target Service and Gateway


To study timeouts better I have created a sample service(code available here) which takes in a configurable "delay" parameter as part of the request body and a sample request/response looks something like this:

A sample request with a 5 second delay:

{
  "id": "1",
  "payload": "Hello",
  "delay_by": 5000,
  "throw_exception": false
}


and an expected response:

{
  "id": "1",
  "received": "Hello",
  "payload": "Hello!"
}


This service is registered with an id of "sample-svc" in Eureka, a Spring Cloud Zuul proxy on top of this service has the following configuration:

zuul:
  ignoredServices: '*'
  routes:
    samplesvc:
      path: /samplesvc/**
      stripPrefix: true
      serviceId: sample-svc

Essentially forward all requests to /samplesvc/ uri to a service disambiguated with the name "sample-svc" via Eureka.

I also have a UI on top of the gateway to make testing with different delay's easier:


Service Delay Tests


The Gateway behaves without any timeout related issues when a low "delay" parameter is added to the service call, however if the delay parameter is changed as low as say 1 to 1.5 seconds the gateway would time out.

The reason is that if the Gateway is set up to use Eureka, then the Gateway uses Netflix Ribbon component to make the actual call. Further, the ribbon call is wrapped within Hystrix to ensure that the call remains fault tolerant. The first timeout that we are hitting is because Hystrix has a very low delay tolerance threshold and tweaking the hystrix settings should get us past the first timeout.

hystrix:
  command:
    sample-svc:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 15000

Note that the Hystrix "Command Key" used for configuration is the name of the service as registered in Eureka.

This may be a little too fine grained for this specific Zuul call, if you are okay about tweaking it across the board then configuration along these lines should do the job:

hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 15000

With this change the request to the service via the gateway with a delay of upto 5 seconds will now go through without any issues. If we were to go above 5 seconds though we would get another timeout. We are now hitting Ribbons timeout setting which again can be configured in a fine grained way for the specific service call by tweaking configuration which looks like this:

sample-svc:
  ribbon:
    ReadTimeout: 15000


With both these timeout tweaks in place the gateway based call should now go through

Conclusion


The purpose was not to show ways of setting arbitrarily high timeout values but just to show how to set values that may be more appropriate for your applications. Sensible timeouts are very important to ensure that bad service behaviors don't cascade upto the users. One thing to note is that if the gateway is configured without Ribbon and Eureka by specifying a direct url to a service then these timeout settings are not relevant at all.

If you are interested in exploring this further, the samples are available here.

Tuesday, June 7, 2016

Spring-Reactive samples

Spring-Reactive aims to bring reactive programming support to Spring based projects and this is expected to be available for the timelines of Spring 5. My intention here is to exercise some of the very basic signatures for REST endpoints with this model.

Before I go ahead let me acknowledge that this entire sample is completely based on the samples which S├ębastien Deleuze has put together here - https://github.com/sdeleuze/spring-reactive-playground

I wanted to consider three examples, first a case where existing Java 8 CompletableFuture is returned as a type, second where RxJava's Observable is returned as a type and third with Spring Reactor Core's Flux type.

Expected Protocol

The structure of the request and response message handled by each of the three service is along these lines, all of them will take in a request which looks like this:

{
 "id":1,
  "delay_by": 2000,
  "payload": "Hello",
  "throw_exception": false
}


The delay_by will make the response to be delayed and throw_exception will make the response to error out. A sane response will be the following:

{
  "id": "1",
  "received": "Hello",
  "payload": "Response Message"
}

I will be ignoring the exceptions for this post.

CompletableFuture as a return type


Consider a service which returns a java 8 CompletableFuture as a return type:

public CompletableFuture<MessageAcknowledgement> handleMessage(Message message) {
 return CompletableFuture.supplyAsync(() -> {
  Util.delay(message.getDelayBy());
  return new MessageAcknowledgement(message.getId(), message.getPayload(), "data from CompletableFutureService");
 }, futureExecutor);
}

The method signature of a Controller which calls this service looks like this now:

@RestController
public class CompletableFutureController {

 private final CompletableFutureService aService;

 @Autowired
 public CompletableFutureController(CompletableFutureService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageFuture", method = RequestMethod.POST)
 public CompletableFuture<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessage(message);
 }

}

When the CompletableFuture completes the framework will ensure that the response is marshalled back appropriately.

Rx Java Observable as a return type

Consider a service which returns a Rx Java Observable as a return type:

public Observable<MessageAcknowledgement> handleMessage(Message message) {
 logger.info("About to Acknowledge");
 return Observable.just(message)
   .delay(message.getDelayBy(), TimeUnit.MILLISECONDS)
   .flatMap(msg -> {
    if (msg.isThrowException()) {
     return Observable.error(new IllegalStateException("Throwing a deliberate exception!"));
    }
    return Observable.just(new MessageAcknowledgement(message.getId(), message.getPayload(), "From RxJavaService"));
   });
}

The controller invoking such a service can directly return the Observable as a type now and the framework will ensure that once all the items have been emitted the response is marshalled correctly.
@RestController
public class RxJavaController {

 private final RxJavaService aService;

 @Autowired
 public RxJavaController(RxJavaService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageRxJava", method = RequestMethod.POST)
 public Observable<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  System.out.println("Got Message..");
  return this.aService.handleMessage(message);
 }

}

Note that since Observable represents a stream of 0 to many items, this time around the response is a json array.


Spring Reactor Core Flux as a return type


Finally, if the response type is a Flux type, the framework ensures that the response is handled cleanly. The service is along these lines:

public Flux<messageacknowledgement> handleMessage(Message message) {
 return Flux.just(message)
   .delay(Duration.ofMillis(message.getDelayBy()))
   .map(msg -> Tuple.of(msg, msg.isThrowException()))
   .flatMap(tup -> {
    if (tup.getT2()) {
     return Flux.error(new IllegalStateException("Throwing a deliberate Exception!"));
    }
    Message msg = tup.getT1();
    return Flux.just(new MessageAcknowledgement(msg.getId(), msg.getPayload(), "Response from ReactorService"));
   });
}

and a controller making use of such a service:

@RestController
public class ReactorController {

 private final ReactorService aService;

 @Autowired
 public ReactorController(ReactorService aService) {
  this.aService = aService;
 }

 @RequestMapping(path = "/handleMessageReactor", method = RequestMethod.POST)
 public Flux<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessage(message);
 }

}

Conclusion

This is just a sampling of the kind of return types that the Spring Reactive project supports, the possible return types is way more than this - here is a far more comprehensive example.

I look forward to when the reactive programming model becomes available in the core Spring framework.

The samples presented in this blog post is available at my github repository