Sunday, January 25, 2015

Learning Netflix Governator - Part 2

To continue from the previous entry on some basic learnings on Netflix Governator, here I will cover one more enhancement that Netflix Governator brings to Google Guice - Lifecycle Management

Lifecycle Management essentially provides hooks into the different lifecycle phases that an object is taken through, to quote the wiki article on Governator:

Allocation (via Guice)
     |
     v
Pre Configuration
     |
     v
Configuration
     |
     V
Set Resources
     |
     V
Post Construction
     |
     V
Validation and Warm Up
     |
     V
  -- application runs until termination, then... --    
     |
     V
Pre Destroy

To illustrate this, consider the following code:

package sample.gov;

import com.google.inject.Inject;
import com.netflix.governator.annotations.AutoBindSingleton;
import sample.dao.BlogDao;
import sample.model.BlogEntry;
import sample.service.BlogService;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@AutoBindSingleton(baseClass = BlogService.class)
public class DefaultBlogService implements BlogService {
    private final BlogDao blogDao;

    @Inject
    public DefaultBlogService(BlogDao blogDao) {
        this.blogDao = blogDao;
    }

    @Override
    public BlogEntry get(long id) {
        return this.blogDao.findById(id);
    }

    @PostConstruct
    public void postConstruct() {
        System.out.println("Post-construct called!!");
    }
    @PreDestroy
    public void preDestroy() {
        System.out.println("Pre-destroy called!!");
    }
}

Here two methods have been annotated with @PostConstruct and @PreDestroy annotations to hook into these specific phases of the Governator's lifecycle for this object. The neat thing is that these annotations are not Governator specific but are JSR-250 annotations that are now baked into the JDK.

Calling the test for this class appropriately calls the annotated methods, here is a sample test:

mport com.google.inject.Injector;
import com.netflix.governator.guice.LifecycleInjector;
import com.netflix.governator.lifecycle.LifecycleManager;
import org.junit.Test;
import sample.service.BlogService;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;

public class SampleWithGovernatorTest {

    @Test
    public void testExampleBeanInjection() throws Exception {
        Injector injector  = LifecycleInjector
                .builder()
                .withModuleClass(SampleModule.class)
                .usingBasePackages("sample.gov")
                .build()
                .createInjector();

        LifecycleManager manager = injector.getInstance(LifecycleManager.class);

        manager.start();

        BlogService blogService = injector.getInstance(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
        manager.close();
    }

}

Spring Framework has supported a similar mechanism for a long time - so the exact same JSR-250 based annotations work for Spring bean too.

If you are interested in exploring this further, here is my github project with samples with Lifecycle management.

Monday, January 19, 2015

Learning Netflix Governator - Part 1

I have been working with Netflix Governator for the last few days and got to try out a small sample using Governator as a way to compare it with the dependency injection feature set of Spring Framework. The following is by no means comprehensive, I will expand on this in the next series of posts.

So Governator for the uninitiated is an extension to Google Guice enhancing it with some Spring like features, to quote the Governator site:

classpath scanning and automatic binding, lifecycle management, configuration to field mapping, field validation and parallelized object warmup.

Here I will demonstrate two features, classpath scanning and automatic binding.

Basic Dependency Injection

Consider a BlogService, depending on a BlogDao:

public class DefaultBlogService implements BlogService {
    private final BlogDao blogDao;

    public DefaultBlogService(BlogDao blogDao) {
        this.blogDao = blogDao;
    }

    @Override
    public BlogEntry get(long id) {
        return this.blogDao.findById(id);
    }
}

If I were using Spring to define the dependency between these two components, the following would be the configuration:

package sample.spring;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sample.dao.BlogDao;
import sample.service.BlogService;

@Configuration
public class SampleConfig {

    @Bean
    public BlogDao blogDao() {
        return new DefaultBlogDao();
    }

    @Bean
    public BlogService blogService() {
        return new DefaultBlogService(blogDao());
    }
}

In Spring, the dependency configuration is specified in a class annotated with @Configuration annotation. The methods annotated with @Bean return the components, note how the blogDao is being injected through constructor injection in blogService method.

A unit test for this configuration is the following:

package sample.spring;

import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import sample.service.BlogService;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;

public class SampleSpringExplicitTest {

    @Test
    public void testSpringInjection() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SampleConfig.class);
        context.refresh();

        BlogService blogService = context.getBean(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
        context.close();
    }

}


Note that Spring provides good support for unit testing, a better test would be the following:

package sample.spring;

package sample.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import sample.service.BlogService;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SampleSpringAutowiredTest {

    @Autowired
    private BlogService blogService;

    @Test
    public void testSpringInjection() {
        assertThat(blogService.get(1l), is(notNullValue()));
    }

    @Configuration
    @ComponentScan("sample.spring")
    public static class SpringConig {

    }

}


This is basic dependency injection, so to specify such a dependency Governator itself is not required, Guice is sufficient, this is how the configuration would look using Guice Modules:

package sample.guice;

import com.google.inject.AbstractModule;
import sample.dao.BlogDao;
import sample.service.BlogService;

public class SampleModule extends AbstractModule{

    @Override
    protected void configure() {
        bind(BlogDao.class).to(DefaultBlogDao.class);
        bind(BlogService.class).to(DefaultBlogService.class);
    }
}

and a Unit test for this configuration is the following:


package sample.guice;

import com.google.inject.Guice;
import com.google.inject.Injector;
import org.junit.Test;
import sample.service.BlogService;

import static org.hamcrest.Matchers.*;
import static org.hamcrest.MatcherAssert.*;

public class SampleModuleTest {

    @Test
    public void testExampleBeanInjection() {
        Injector injector = Guice.createInjector(new SampleModule());
        BlogService blogService = injector.getInstance(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
    }

}


Classpath Scanning and Autobinding

Classpath scanning is a way to detect the components by looking for markers in the classpath. A sample with Spring should clarify this:

@Repository
public class DefaultBlogDao implements BlogDao {
    ....
}

@Service
public class DefaultBlogService implements BlogService {

    private final BlogDao blogDao;

    @Autowired
    public DefaultBlogService(BlogDao blogDao) {
        this.blogDao = blogDao;
    }
    ...
}

Here the annotations @Service, @Repository are used as markers to indicate that these are components and the dependencies are specified by the @Autowired annotation on the constructor of the DefaultBlogService.

Given this the configuration is now simplified, we just need to provide the package name that should be scanned for such annotated components and this is how a full test would look:
package sample.spring;
...
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SampleSpringAutowiredTest {

    @Autowired
    private BlogService blogService;

    @Test
    public void testSpringInjection() {
        assertThat(blogService.get(1l), is(notNullValue()));
    }

    @Configuration
    @ComponentScan("sample.spring")
    public static class SpringConig {}
}



Governator provides a similar kind of a support:
@AutoBindSingleton(baseClass = BlogDao.class)
public class DefaultBlogDao implements BlogDao {
    ....
}

@AutoBindSingleton(baseClass = BlogService.class)
public class DefaultBlogService implements BlogService {
    private final BlogDao blogDao;

    @Inject
    public DefaultBlogService(BlogDao blogDao) {
        this.blogDao = blogDao;
    }
    ....
}

Here, @AutoBindSingleton annotation is being used as a marker annotation to define the guice binding, given this a test with classpath scanning is the following:

package sample.gov;

import com.google.inject.Injector;
import com.netflix.governator.guice.LifecycleInjector;
import com.netflix.governator.lifecycle.LifecycleManager;
import org.junit.Test;
import sample.service.BlogService;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class SampleWithGovernatorTest {

    @Test
    public void testExampleBeanInjection() throws Exception {
        Injector injector  = LifecycleInjector
                .builder()
                .withModuleClass(SampleModule.class)
                .usingBasePackages("sample.gov")
                .build()
                .createInjector();

        LifecycleManager manager = injector.getInstance(LifecycleManager.class);

        manager.start();

        BlogService blogService = injector.getInstance(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
    }

}

See how the package to be scanned is specified using a LifecycleInjector component of Governator, this autodetects the components and wires them together.

Just to wrap the classpath scanning and Autobinding features, Governator like Spring provides a support for junit testing and a better test would be the following:

package sample.gov;

import com.google.inject.Injector;
import com.netflix.governator.guice.LifecycleTester;
import org.junit.Rule;
import org.junit.Test;
import sample.service.BlogService;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;

public class SampleWithGovernatorJunitSupportTest {

    @Rule
    public LifecycleTester tester = new LifecycleTester();

    @Test
    public void testExampleBeanInjection() throws Exception {
        tester.start();
        Injector injector = tester
                .builder()
                .usingBasePackages("sample.gov")
                .build()
                .createInjector();

        BlogService blogService = injector.getInstance(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
    }

}

Conclusion
If you are interested in exploring this further I have a sample in this github project, I would be expanding this project as I learn more about Governator

Sunday, January 4, 2015

Using Netflix Hystrix annotations with Spring

I can't think of a better way to describe a specific feature of Netflix Hystrix library than by quoting from its home page:

Latency and Fault Tolerance by:
Stop cascading failures. Fallbacks and graceful degradation. Fail fast and rapid recovery.

Thread and semaphore isolation with circuit breakers.

I saw a sample demonstrated by Josh Long(@starbuxman) which makes use of Hystrix integrated with Spring - the specific code is here. The sample makes use of annotations to hystrix enable a service class.

My objective here is to recreate a similar set-up in a smaller unit test mode. With that in mind, consider the following interface which is going to be made fault tolerant using Hystrix library:

package hystrixtest;

public interface RemoteCallService {

    String call(String request) throws Exception;

}

And a dummy implementation for it. The dummy implementation delegates to a mock implementation which in-turn fails the first two times it is called and succeeds with the third call:

package hystrixtest;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.mockito.Mockito.*;


public class DummyRemoteCallService implements RemoteCallService {

    private RemoteCallService mockedDelegate;

    public DummyRemoteCallService() {
        try {
            mockedDelegate = mock(RemoteCallService.class);
            when(mockedDelegate.call(anyString()))
                    .thenThrow(new RuntimeException("Deliberately throwing an exception 1"))
                    .thenThrow(new RuntimeException("Deliberately throwing an exception 2"))
                    .thenAnswer(new Answer<String>() {
                        @Override
                        public String answer(InvocationOnMock invocationOnMock) throws Throwable {
                            return (String) invocationOnMock.getArguments()[0];
                        }
                    });
        }catch(Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    @HystrixCommand(fallbackMethod = "fallBackCall")
    public String call(String request) throws Exception {
        return this.mockedDelegate.call(request);
    }

    public String fallBackCall(String request) {
        return "FALLBACK: " + request;
    }
}

The remote call has been annotated with the @Hystrixcommand annotation with a basic configuration to fall back to a "fallBackCall" method in case of a failed remote call.

Now, as you can imagine, there has to be something in the Hystrix library which should intercept calls annotated with @HystrixCommand annotation and makes it fault tolerant. This is a working test which wraps the necessary infrastructure together - in essence, Hystrix library provides a companion AOP based library that intercepts the calls. I have used Spring testing support here to bootstrap the AOP infrastructure, to create the HystrixCommandAspect as a bean, the call goes to the "fallBackCall" for the first two failed calls and succeeds the third time around:


package hystrixtest;

import com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class TestRemoteCallServiceHystrix {

    @Autowired
    private RemoteCallService remoteCallService ;

    @Test
    public void testRemoteCall() throws Exception{
        assertThat(this.remoteCallService.call("test"), is("FALLBACK: test"));
        assertThat(this.remoteCallService.call("test"), is("FALLBACK: test"));
        assertThat(this.remoteCallService.call("test"), is("test"));
    }

    @Configuration
    @EnableAspectJAutoProxy
    public static class SpringConfig {

        @Bean
        public HystrixCommandAspect hystrixCommandAspect() {
            return new HystrixCommandAspect();
        }

        @Bean
        public RemoteCallService remoteCallService() {
            return new DummyRemoteCallService();
        }
    }
}

Spring-Cloud provides an easier way to configure the Netflix libraries for Spring-Boot based projects and if I were to use this library the test would transform to this, a bunch of configuration is now commented out with the help of Spring-Boot:

package hystrixtest;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;


@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration
public class TestRemoteCallServiceHystrix {

    @Autowired
    private RemoteCallService remoteCallService;

    @Test
    public void testRemoteCall() throws Exception {
        assertThat(this.remoteCallService.call("test"), is("FALLBACK: test"));
        assertThat(this.remoteCallService.call("test"), is("FALLBACK: test"));
        assertThat(this.remoteCallService.call("test"), is("test"));
    }

    @Configuration
    @EnableAutoConfiguration
//    @EnableAspectJAutoProxy
    @EnableHystrix
    public static class SpringConfig {

//        @Bean
//        public HystrixCommandAspect hystrixCommandAspect() {
//            return new HystrixCommandAspect();
//        }

        @Bean
        public RemoteCallService remoteCallService() {
            return new DummyRemoteCallService();
        }
    }
}

If you are interested in exploring this sample further, here is the github repo with the working tests.

Saturday, December 27, 2014

Spring retry - ways to integrate with your project

If you have a need to implement robust retry logic in your code, a proven way would be to use the spring retry library. My objective here is not to show how to use the spring retry project itself, but in demonstrating different ways that it can be integrated into your codebase.

Consider a service to invoke an external system:

package retry.service;

public interface RemoteCallService {
    String call() throws Exception;
}


Assume that this call can fail and you want the call to be retried thrice with a 2 second delay each time the call fails, so to simulate this behavior I have defined a mock service using Mockito this way, note that this is being returned as a mocked Spring bean:

@Bean
public RemoteCallService remoteCallService() throws Exception {
    RemoteCallService remoteService = mock(RemoteCallService.class);
    when(remoteService.call())
            .thenThrow(new RuntimeException("Remote Exception 1"))
            .thenThrow(new RuntimeException("Remote Exception 2"))
            .thenReturn("Completed");
    return remoteService;
}
So essentially this mocked service fails 2 times and succeeds with the third call.

And this is the test for the retry logic:

public class SpringRetryTests {

    @Autowired
    private RemoteCallService remoteCallService;

    @Test
    public void testRetry() throws Exception {
        String message = this.remoteCallService.call();
        verify(remoteCallService, times(3)).call();
        assertThat(message, is("Completed"));
    }
}

We are ensuring that the service is called 3 times to account for the first two failed calls and the third call which succeeds.

If we were to directly incorporate spring-retry at the point of calling this service, then the code would have looked like this:
@Test
public void testRetry() throws Exception {
    String message = this.retryTemplate.execute(context -> this.remoteCallService.call());
    verify(remoteCallService, times(3)).call();
    assertThat(message, is("Completed"));
}

This is not ideal however, a better way would be where the callers don't have have to be explicitly aware of the fact that there is a retry logic in place.

Given this, the following are the approaches to incorporate Spring-retry logic.

Approach 1: Custom Aspect to incorporate Spring-retry

This approach should be fairly intuitive as the retry logic can be considered a cross cutting concern and a good way to implement a cross cutting concern is using Aspects. An aspect which incorporates the Spring-retry would look something along these lines:

package retry.aspect;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.support.RetryTemplate;

@Aspect
public class RetryAspect {

    private static Logger logger = LoggerFactory.getLogger(RetryAspect.class);

    @Autowired
    private RetryTemplate retryTemplate;

    @Pointcut("execution(* retry.service..*(..))")
    public void serviceMethods() {
        //
    }

    @Around("serviceMethods()")
    public Object aroundServiceMethods(ProceedingJoinPoint joinPoint) {
        try {
            return retryTemplate.execute(retryContext -> joinPoint.proceed());
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }
}

This aspect intercepts the remote service call and delegates the call to the retryTemplate. A full working test is here.

Approach 2: Using Spring-retry provided advice

Out of the box Spring-retry project provides an advice which takes care of ensuring that targeted services can be retried. The AOP configuration to weave the advice around the service requires dealing with raw xml as opposed to the previous approach where the aspect can be woven using Spring Java configuration. The xml configuration looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

 <aop:config>
  <aop:pointcut id="transactional"
       expression="execution(* retry.service..*(..))" />
  <aop:advisor pointcut-ref="transactional"
      advice-ref="retryAdvice" order="-1"/>
 </aop:config>

</beans>

The full working test is here.

Approach 3: Declarative retry logic

This is the recommended approach, you will see that the code is far more concise than with the previous two approaches. With this approach, the only thing that needs to be done is to declaratively indicate which methods need to be retried:

package retry.service;

import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;

public interface RemoteCallService {
    @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000))
    String call() throws Exception;
}

and a full test which makes use of this declarative retry logic, also available here:

package retry;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import retry.service.RemoteCallService;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.*;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SpringRetryDeclarativeTests {

    @Autowired
    private RemoteCallService remoteCallService;

    @Test
    public void testRetry() throws Exception {
        String message = this.remoteCallService.call();
        verify(remoteCallService, times(3)).call();
        assertThat(message, is("Completed"));
    }

    @Configuration
    @EnableRetry
    public static class SpringConfig {

        @Bean
        public RemoteCallService remoteCallService() throws Exception {
            RemoteCallService remoteService = mock(RemoteCallService.class);
            when(remoteService.call())
                    .thenThrow(new RuntimeException("Remote Exception 1"))
                    .thenThrow(new RuntimeException("Remote Exception 2"))
                    .thenReturn("Completed");
            return remoteService;
        }
    }
}

The @EnableRetry annotation activates the processing of @Retryable annotated methods and internally uses logic along the lines of approach 2 without the end user needing to be explicit about it.

I hope this gives you a slightly better taste for how to incorporate Spring-retry in your project. All the code that I have demonstrated here is also available in my github project here: https://github.com/bijukunjummen/test-spring-retry

Tuesday, December 23, 2014

Solving "Water buckets" problem using Scala

I recently came across a puzzle called the "Water Buckets" problem in this book, which totally stumped me.


You have a 12-gallon bucket, an 8-gallon bucket and a 5-gallon bucket. The 12-gallon bucket is full of water and the other two are empty. Without using any additional water how can you divide the twelve gallons of water equally so that two of the three buckets have exactly 6 gallons of water in them?


I and my nephew spent a good deal of time trying to solve it and ultimately gave up.

I remembered then that I have seen a programmatic solution to a similar puzzle being worked out in the "Functional Programming Principles in Scala" Coursera course by Martin Odersky.

This is the gist to the solution completely copied from the course:



and running this program spits out the following 7 step solution! (index 0 is the 12-gallon bucket, 1 is the 8-gallon bucket and 2 is the 5-gallon bucket)

Pour(0,1) 
Pour(1,2) 
Pour(2,0) 
Pour(1,2) 
Pour(0,1) 
Pour(1,2) 
Pour(2,0)

If you are interested in learning more about the code behind this solution, the best way is to follow the week 7 of the Coursera course that I have linked above, Martin Odersky does a fantastic job of seemingly coming up with a solution on the fly!.

Saturday, December 13, 2014

RabbitMQ - Processing messages serially using Spring integration Java DSL

If you ever have a need to process messages serially with RabbitMQ with a cluster of listeners processing the messages, the best way that I have seen is to use a "exclusive consumer" flag on a listener with 1 thread on each listener processing the messages.

Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially. There is a catch however, I will go over it later.

Let me demonstrate this behavior with a Spring Boot and Spring Integration based RabbitMQ message consumer.

First, this is the configuration for setting up a queue using Spring java configuration, note that since this is a Spring Boot application, it automatically creates a RabbitMQ connection factory when the Spring-amqp library is added to the list of dependencies:

@Configuration
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }

}

Given this sample queue, a listener which gets the messages from this queue and processes them looks like this, the flow is written using the excellent Spring integration Java DSL library:

@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }

}


The flow is very concisely expressed in the inboundFlow method, a message payload from RabbitMQ is transformed from byte array to String and finally processed by simply logging the message to the logs

The important part of the flow is the listener configuration, note the flag which sets the consumer to be an exclusive consumer and within this consumer the number of threads processing is set to 1. Given this even if multiple instances of the application is started up only 1 of the listeners will be able to connect and process messages.


Now for the catch, consider a case where the processing of messages takes a while to complete and rolls back during processing of the message. If the instance of the application handling the message were to be stopped in the middle of processing such a message, then the behavior is a different instance will start handling the messages in the queue, when the stopped instance rolls back the message, the rolled back message is then delivered to the new exclusive consumer, thus getting a message out of order.

If you are interested in exploring this further, here is a github project to play with this feature: https://github.com/bijukunjummen/test-rabbit-exclusive

Saturday, November 29, 2014

Spring RestTemplate with a linked resource

Spring Data REST is an awesome project that provides mechanisms to expose the resources underlying a Spring Data based repository as REST resources.

Exposing a service with a linked resource


Consider two simple JPA based entities, Course and Teacher:

@Entity
@Table(name = "teachers")
public class Teacher {
 @Id
 @GeneratedValue(strategy = GenerationType.AUTO)
 @Column(name = "id")
 private Long id;

 @Size(min = 2, max = 50)
 @Column(name = "name")
 private String name;

 @Column(name = "department")
 @Size(min = 2, max = 50)
 private String department;    
    ...
}

@Entity
@Table(name = "courses")
public class Course {
 @Id
 @GeneratedValue(strategy = GenerationType.AUTO)
 @Column(name = "id")
 private Long id;

 @Size(min = 1, max = 10)
 @Column(name = "coursecode")
 private String courseCode;

 @Size(min = 1, max = 50)
 @Column(name = "coursename")
 private String courseName;

 @ManyToOne
 @JoinColumn(name = "teacher_id")
 private Teacher teacher;
 
       ....
}

essentially the relation looks like this:

Now, all it takes to expose these entities as REST resources is adding a @RepositoryRestResource annotation on their JPA based Spring Data repositories this way, first for the "Teacher" resource:
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import univ.domain.Teacher;

@RepositoryRestResource
public interface TeacherRepo extends JpaRepository<Teacher, Long> {
}

and for exposing the Course resource:

@RepositoryRestResource
public interface CourseRepo extends JpaRepository<Course, Long> {
}

With this done and assuming a few teachers and a few courses are already in the datastore, a GET on courses would yield a response of the following type:

{
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/courses{?page,size,sort}",
      "templated" : true
    }
  },
  "_embedded" : {
    "courses" : [ {
      "courseCode" : "Course1",
      "courseName" : "Course Name 1",
      "version" : 0,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/api/courses/1"
        },
        "teacher" : {
          "href" : "http://localhost:8080/api/courses/1/teacher"
        }
      }
    }, {
      "courseCode" : "Course2",
      "courseName" : "Course Name 2",
      "version" : 0,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/api/courses/2"
        },
        "teacher" : {
          "href" : "http://localhost:8080/api/courses/2/teacher"
        }
      }
    } ]
  },
  "page" : {
    "size" : 20,
    "totalElements" : 2,
    "totalPages" : 1,
    "number" : 0
  }
}

and a specific course looks like this:
{
  "courseCode" : "Course1",
  "courseName" : "Course Name 1",
  "version" : 0,
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/courses/1"
    },
    "teacher" : {
      "href" : "http://localhost:8080/api/courses/1/teacher"
    }
  }
}

If you are wondering what the "_links", "_embedded" are - Spring Data REST uses Hypertext Application Language(or HAL for short) to represent the links, say the one between a course and a teacher.

HAL Based REST service - Using RestTemplate


Given this HAL based REST service, the question that I had in my mind was how to write a client to this service. I am sure there are better ways of doing this, but what follows worked for me and I welcome any cleaner ways of writing the client.

First, I modified the RestTemplate to register a custom Json converter that understands HAL based links:

public RestTemplate getRestTemplateWithHalMessageConverter() {
 RestTemplate restTemplate = new RestTemplate();
 List<HttpMessageConverter<?>> existingConverters = restTemplate.getMessageConverters();
 List<HttpMessageConverter<?>> newConverters = new ArrayList<>();
 newConverters.add(getHalMessageConverter());
 newConverters.addAll(existingConverters);
 restTemplate.setMessageConverters(newConverters);
 return restTemplate;
}

private HttpMessageConverter getHalMessageConverter() {
 ObjectMapper objectMapper = new ObjectMapper();
 objectMapper.registerModule(new Jackson2HalModule());
 MappingJackson2HttpMessageConverter halConverter = new TypeConstrainedMappingJackson2HttpMessageConverter(ResourceSupport.class);
 halConverter.setSupportedMediaTypes(Arrays.asList(HAL_JSON));
 halConverter.setObjectMapper(objectMapper);
 return halConverter;
}

The Jackson2HalModule is provided by the Spring HATEOS project and understands HAL representation.


Given this shiny new RestTemplate, first let us create a Teacher entity:

Teacher teacher1 = new Teacher();
teacher1.setName("Teacher 1");
teacher1.setDepartment("Department 1");
URI teacher1Uri =
  testRestTemplate.postForLocation("http://localhost:8080/api/teachers", teacher1);

Note that when the entity is created, the response is a http status code of 201 with the Location header pointing to the uri of the newly created resource, Spring RestTemplate provides a neat way of posting and getting hold of this Location header through an API. So now we have a teacher1Uri representing the newly created teacher.

Given this teacher URI, let us now retrieve the teacher, the raw json for the teacher resource looks like the following:
{
  "name" : "Teacher 1",
  "department" : "Department 1",
  "version" : 0,
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/teachers/1"
    }
  }
}

and to retrieve this using RestTemplate:
ResponseEntity<Resource<Teacher>> teacherResponseEntity
  = testRestTemplate.exchange("http://localhost:8080/api/teachers/1", HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Teacher>>() {
});

Resource<Teacher> teacherResource = teacherResponseEntity.getBody();

Link teacherLink = teacherResource.getLink("self");
String teacherUri = teacherLink.getHref();

Teacher teacher = teacherResource.getContent();

Jackson2HalModule is the one which helps unpack the links this cleanly and to get hold of the Teacher entity itself. I have previously explained ParameterizedTypeReference here.


Now, to a more tricky part, creating a Course.

Creating a course is tricky as it has a relation to the Teacher and representing this relation using HAL is not that straightforward. A raw POST to create the course would look like this:

 {
      "courseCode" : "Course1",
      "courseName" : "Course Name 1",
      "version" : 0,
      "teacher" : "http://localhost:8080/api/teachers/1"
}

Note how the reference to the teacher is a URI, this is how HAL represents an embedded reference specifically for a POST'ed content, so now to get this form through RestTemplate -

First to create a Course:

Course course1 = new Course();
course1.setCourseCode("Course1");
course1.setCourseName("Course Name 1");

At this point, it will be easier to handle providing the teacher link by dealing with a json tree representation and adding in the teacher link as the teacher uri:

ObjectMapper objectMapper = getObjectMapperWithHalModule();
ObjectNode jsonNodeCourse1 = (ObjectNode) objectMapper.valueToTree(course1);
jsonNodeCourse1.put("teacher", teacher1Uri.getPath());

and posting this should create the course with the linked teacher:

URI course1Uri = testRestTemplate.postForLocation(coursesUri, jsonNodeCourse1);

and to retrieve this newly created Course:

ResponseEntity<Resource<Course>> courseResponseEntity
  = testRestTemplate.exchange(course1Uri, HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Course>>() {
});

Resource<Course> courseResource = courseResponseEntity.getBody();
Link teacherLinkThroughCourse = courseResource.getLink("teacher");

This concludes how to use the RestTemplate to create and retrieve a linked resource, alternate ideas are welcome.

If you are interested in exploring this further, the entire sample is available at this github repo -  and the test is here


References:

Hypertext Application Language(or HAL for short)
HAL Specification
Spring RestTemplate