Saturday, December 13, 2014

RabbitMQ - Processing messages serially using Spring integration Java DSL

If you ever have a need to process messages serially with RabbitMQ with a cluster of listeners processing the messages, the best way that I have seen is to use a "exclusive consumer" flag on a listener with 1 thread on each listener processing the messages.

Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially. There is a catch however, I will go over it later.

Let me demonstrate this behavior with a Spring Boot and Spring Integration based RabbitMQ message consumer.

First, this is the configuration for setting up a queue using Spring java configuration, note that since this is a Spring Boot application, it automatically creates a RabbitMQ connection factory when the Spring-amqp library is added to the list of dependencies:

@Configuration
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }

}

Given this sample queue, a listener which gets the messages from this queue and processes them looks like this, the flow is written using the excellent Spring integration Java DSL library:

@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);

    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }

}


The flow is very concisely expressed in the inboundFlow method, a message payload from RabbitMQ is transformed from byte array to String and finally processed by simply logging the message to the logs

The important part of the flow is the listener configuration, note the flag which sets the consumer to be an exclusive consumer and within this consumer the number of threads processing is set to 1. Given this even if multiple instances of the application is started up only 1 of the listeners will be able to connect and process messages.


Now for the catch, consider a case where the processing of messages takes a while to complete and rolls back during processing of the message. If the instance of the application handling the message were to be stopped in the middle of processing such a message, then the behavior is a different instance will start handling the messages in the queue, when the stopped instance rolls back the message, the rolled back message is then delivered to the new exclusive consumer, thus getting a message out of order.

If you are interested in exploring this further, here is a github project to play with this feature: https://github.com/bijukunjummen/test-rabbit-exclusive

Saturday, November 29, 2014

Spring RestTemplate with a linked resource

Spring Data REST is an awesome project that provides mechanisms to expose the resources underlying a Spring Data based repository as REST resources.

Exposing a service with a linked resource


Consider two simple JPA based entities, Course and Teacher:

@Entity
@Table(name = "teachers")
public class Teacher {
 @Id
 @GeneratedValue(strategy = GenerationType.AUTO)
 @Column(name = "id")
 private Long id;

 @Size(min = 2, max = 50)
 @Column(name = "name")
 private String name;

 @Column(name = "department")
 @Size(min = 2, max = 50)
 private String department;    
    ...
}

@Entity
@Table(name = "courses")
public class Course {
 @Id
 @GeneratedValue(strategy = GenerationType.AUTO)
 @Column(name = "id")
 private Long id;

 @Size(min = 1, max = 10)
 @Column(name = "coursecode")
 private String courseCode;

 @Size(min = 1, max = 50)
 @Column(name = "coursename")
 private String courseName;

 @ManyToOne
 @JoinColumn(name = "teacher_id")
 private Teacher teacher;
 
       ....
}

essentially the relation looks like this:

Now, all it takes to expose these entities as REST resources is adding a @RepositoryRestResource annotation on their JPA based Spring Data repositories this way, first for the "Teacher" resource:
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import univ.domain.Teacher;

@RepositoryRestResource
public interface TeacherRepo extends JpaRepository<Teacher, Long> {
}

and for exposing the Course resource:

@RepositoryRestResource
public interface CourseRepo extends JpaRepository<Course, Long> {
}

With this done and assuming a few teachers and a few courses are already in the datastore, a GET on courses would yield a response of the following type:

{
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/courses{?page,size,sort}",
      "templated" : true
    }
  },
  "_embedded" : {
    "courses" : [ {
      "courseCode" : "Course1",
      "courseName" : "Course Name 1",
      "version" : 0,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/api/courses/1"
        },
        "teacher" : {
          "href" : "http://localhost:8080/api/courses/1/teacher"
        }
      }
    }, {
      "courseCode" : "Course2",
      "courseName" : "Course Name 2",
      "version" : 0,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/api/courses/2"
        },
        "teacher" : {
          "href" : "http://localhost:8080/api/courses/2/teacher"
        }
      }
    } ]
  },
  "page" : {
    "size" : 20,
    "totalElements" : 2,
    "totalPages" : 1,
    "number" : 0
  }
}

and a specific course looks like this:
{
  "courseCode" : "Course1",
  "courseName" : "Course Name 1",
  "version" : 0,
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/courses/1"
    },
    "teacher" : {
      "href" : "http://localhost:8080/api/courses/1/teacher"
    }
  }
}

If you are wondering what the "_links", "_embedded" are - Spring Data REST uses Hypertext Application Language(or HAL for short) to represent the links, say the one between a course and a teacher.

HAL Based REST service - Using RestTemplate


Given this HAL based REST service, the question that I had in my mind was how to write a client to this service. I am sure there are better ways of doing this, but what follows worked for me and I welcome any cleaner ways of writing the client.

First, I modified the RestTemplate to register a custom Json converter that understands HAL based links:

public RestTemplate getRestTemplateWithHalMessageConverter() {
 RestTemplate restTemplate = new RestTemplate();
 List<HttpMessageConverter<?>> existingConverters = restTemplate.getMessageConverters();
 List<HttpMessageConverter<?>> newConverters = new ArrayList<>();
 newConverters.add(getHalMessageConverter());
 newConverters.addAll(existingConverters);
 restTemplate.setMessageConverters(newConverters);
 return restTemplate;
}

private HttpMessageConverter getHalMessageConverter() {
 ObjectMapper objectMapper = new ObjectMapper();
 objectMapper.registerModule(new Jackson2HalModule());
 MappingJackson2HttpMessageConverter halConverter = new TypeConstrainedMappingJackson2HttpMessageConverter(ResourceSupport.class);
 halConverter.setSupportedMediaTypes(Arrays.asList(HAL_JSON));
 halConverter.setObjectMapper(objectMapper);
 return halConverter;
}

The Jackson2HalModule is provided by the Spring HATEOS project and understands HAL representation.


Given this shiny new RestTemplate, first let us create a Teacher entity:

Teacher teacher1 = new Teacher();
teacher1.setName("Teacher 1");
teacher1.setDepartment("Department 1");
URI teacher1Uri =
  testRestTemplate.postForLocation("http://localhost:8080/api/teachers", teacher1);

Note that when the entity is created, the response is a http status code of 201 with the Location header pointing to the uri of the newly created resource, Spring RestTemplate provides a neat way of posting and getting hold of this Location header through an API. So now we have a teacher1Uri representing the newly created teacher.

Given this teacher URI, let us now retrieve the teacher, the raw json for the teacher resource looks like the following:
{
  "name" : "Teacher 1",
  "department" : "Department 1",
  "version" : 0,
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/api/teachers/1"
    }
  }
}

and to retrieve this using RestTemplate:
ResponseEntity<Resource<Teacher>> teacherResponseEntity
  = testRestTemplate.exchange("http://localhost:8080/api/teachers/1", HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Teacher>>() {
});

Resource<Teacher> teacherResource = teacherResponseEntity.getBody();

Link teacherLink = teacherResource.getLink("self");
String teacherUri = teacherLink.getHref();

Teacher teacher = teacherResource.getContent();

Jackson2HalModule is the one which helps unpack the links this cleanly and to get hold of the Teacher entity itself. I have previously explained ParameterizedTypeReference here.


Now, to a more tricky part, creating a Course.

Creating a course is tricky as it has a relation to the Teacher and representing this relation using HAL is not that straightforward. A raw POST to create the course would look like this:

 {
      "courseCode" : "Course1",
      "courseName" : "Course Name 1",
      "version" : 0,
      "teacher" : "http://localhost:8080/api/teachers/1"
}

Note how the reference to the teacher is a URI, this is how HAL represents an embedded reference specifically for a POST'ed content, so now to get this form through RestTemplate -

First to create a Course:

Course course1 = new Course();
course1.setCourseCode("Course1");
course1.setCourseName("Course Name 1");

At this point, it will be easier to handle providing the teacher link by dealing with a json tree representation and adding in the teacher link as the teacher uri:

ObjectMapper objectMapper = getObjectMapperWithHalModule();
ObjectNode jsonNodeCourse1 = (ObjectNode) objectMapper.valueToTree(course1);
jsonNodeCourse1.put("teacher", teacher1Uri.getPath());

and posting this should create the course with the linked teacher:

URI course1Uri = testRestTemplate.postForLocation(coursesUri, jsonNodeCourse1);

and to retrieve this newly created Course:

ResponseEntity<Resource<Course>> courseResponseEntity
  = testRestTemplate.exchange(course1Uri, HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Course>>() {
});

Resource<Course> courseResource = courseResponseEntity.getBody();
Link teacherLinkThroughCourse = courseResource.getLink("teacher");

This concludes how to use the RestTemplate to create and retrieve a linked resource, alternate ideas are welcome.

If you are interested in exploring this further, the entire sample is available at this github repo -  and the test is here


References:

Hypertext Application Language(or HAL for short)
HAL Specification
Spring RestTemplate

Sunday, November 23, 2014

Externalizing session state for a Spring-boot application using spring-session

Spring-session is a very cool new project that aims to provide a simpler way of managing sessions in Java based web applications. One of the features that I explored with spring-session recently was the way it supports externalizing session state without needing to fiddle with the internals of specific web containers like Tomcat or Jetty.

To test spring-session I have used a shopping cart type application(available here) which makes heavy use of session by keeping the items added to the cart as a session attribute, as can be seen from these screenshots:





Consider first a scenario without Spring session. So this is how I have exposed my application:


I am using nginx to load balance across two instances of this application. This set-up is very easy to run using Spring boot, I brought up two instances of the app up using two different server ports, this way:

mvn spring-boot:run -Dserver.port=8080
mvn spring-boot:run -Dserver.port=8082

and this is my nginx.conf to load balance across these two instances:

events {
    worker_connections  1024;
}
http {
    upstream sessionApp {
        server localhost:8080;
        server localhost:8082;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://sessionApp;
        }       
    }
}

I display the port number of the application in the footer just to show which instance is handling the request.

If I were to do nothing to move the state of the session out the application then the behavior of the application would be erratic as the session established on one instance of the application would not be recognized by the other instance - specifically if Tomcat receives a session id it does not recognize then the behavior is to create a new session.

Introducing Spring session into the application


There are container specific ways to introduce a external session stores - One example is here, where Redis is configured as a store for Tomcat. Pivotal Gemfire provides a module to externalize Tomcat's session state.

The advantage of using Spring-session is that there is no dependence on the container at all - maintaining session state becomes an application concern. The instructions on configuring an application to use Spring session is detailed very well at the Spring-session site, just to quickly summarize how I have configured my Spring Boot application, these are first the dependencies that I have pulled in:

<dependency>
 <groupId>org.springframework.session</groupId>
 <artifactId>spring-session</artifactId>
 <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
 <groupId>org.springframework.session</groupId>
 <artifactId>spring-session-data-redis</artifactId>
 <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
 <groupId>org.springframework.data</groupId>
 <artifactId>spring-data-redis</artifactId>
 <version>1.4.1.RELEASE</version>
</dependency>
<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>2.4.1</version>
</dependency>



and my configuration to use Spring-session for session support, note the Spring Boot specific FilterRegistrationBean which is used to register the session repository filter:

mport org.springframework.boot.context.embedded.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
import org.springframework.session.web.http.SessionRepositoryFilter;
import org.springframework.web.filter.DelegatingFilterProxy;

import java.util.Arrays;

@Configuration
@EnableRedisHttpSession
public class SessionRepositoryConfig {

 @Bean
 @Order(value = 0)
 public FilterRegistrationBean sessionRepositoryFilterRegistration(SessionRepositoryFilter springSessionRepositoryFilter) {
  FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
  filterRegistrationBean.setFilter(new DelegatingFilterProxy(springSessionRepositoryFilter));
  filterRegistrationBean.setUrlPatterns(Arrays.asList("/*"));
  return filterRegistrationBean;
 }

 @Bean
 public JedisConnectionFactory connectionFactory() {
  return new JedisConnectionFactory();
 }
}


And that is it! magically now all session is handled by Spring-session, and neatly externalized to Redis.

If I were to retry my previous configuration of using nginx to load balance two different Spring-Boot applications using the common Redis store, the application just works irrespective of the instance handling the request. I look forward to further enhancements to this excellent new project.

The sample application which makes use of Spring-session is available here: https://github.com/bijukunjummen/shopping-cart-cf-app.git

Tuesday, November 11, 2014

Spring boot war packaging

Spring boot recommends creating an executable jar with an embedded container(tomcat or jetty) during build time and using this executable jar as a standalone process at runtime. It is common however to deploy applications to an external container instead and Spring boot provides packaging the applications as a war specifically for this kind of a need.

My focus here is not to repeat the already detailed Spring Boot instructions on creating the war artifact, but on testing the created file to see if it would reliably work on a standalone container. I recently had an issue when creating a war from a Spring Boot project and deploying it on Jetty and this is essentially a learning from that experience.

The best way to test if the war will work reliably will be to simply use the jetty-maven and/or the tomcat maven plugin, with the following entries to the pom.xml file:

<plugin>
 <groupId>org.apache.tomcat.maven</groupId>
 <artifactId>tomcat7-maven-plugin</artifactId>
 <version>2.2</version>
</plugin>
<plugin>
 <groupId>org.eclipse.jetty</groupId>
 <artifactId>jetty-maven-plugin</artifactId>
 <version>9.2.3.v20140905</version>
</plugin>

With the plugins in place, starting up the war with the tomcat plugin:
mvn tomcat7:run

and with the jetty plugin:
mvn jetty:run

If there any issues with the way the war has been created, it should come out at start-up time with these containers. For eg, if I were to leave in the embedded tomcat dependencies:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency> 

then when starting up the maven tomcat plugin, an error along these lines will show up:
java.lang.ClassCastException: org.springframework.web.SpringServletContainerInitializer cannot be cast to javax.servlet.ServletContainerInitializer

an indication of a servlet jar being packaged with the war file, fixed by specifying the scope as provided in the maven dependencies:
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-tomcat</artifactId>
 <scope>provided</scope>
</dependency> 

why both jetty and tomcat plugins, the reason is I saw a difference in behavior specifically with websocket support with jetty as the runtime and not in tomcat. So consider the websocket dependencies which are pulled in the following way:
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> 

This gave me an error when started up using the jetty runtime, and the fix again is to mark the underlying tomcat dependencies as provided, replace above with the following:

<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-websocket</artifactId>
</dependency>
<dependency>
 <groupId>org.apache.tomcat.embed</groupId>
 <artifactId>tomcat-embed-websocket</artifactId>
 <scope>provided</scope>
</dependency>
<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-messaging</artifactId>
</dependency>

So to conclude, a quick way to verify if the war file produced for a Spring-boot application will cleanly deploy to a container(atleast tomcat and jetty) is to add the tomcat and jetty maven plugins and use these plugins to start the application up. Here is a sample project demonstrating this - https://github.com/bijukunjummen/spring-websocket-chat-sample.git

Wednesday, November 5, 2014

Spring boot based websocket application and capturing http session id

I was involved in a project recently where we needed to capture the http session id for a websocket request - the reason was to determine the number of websocket sessions utilizing the same underlying http session

The way to do this is based on a sample utilizing the new spring-session module and is described here.

The trick to capturing the http session id is in understanding that before a websocket connection is established between the browser and the server, there is a handshake phase negotiated over http and the session id is passed to the server during this handshake phase.

Spring Websocket support provides a nice way to register a HandShakeInterceptor, which can be used to capture the http session id and set this in the sub-protocol(typically STOMP) headers. First, this is the way to capture the session id and set it to a header:

public class HttpSessionIdHandshakeInterceptor implements HandshakeInterceptor {

 @Override
 public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
  if (request instanceof ServletServerHttpRequest) {
   ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
   HttpSession session = servletRequest.getServletRequest().getSession(false);
   if (session != null) {
    attributes.put("HTTPSESSIONID", session.getId());
   }
  }
  return true;
 }

 public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
 }
}

And to register this HandshakeInterceptor with Spring Websocket support:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketDefaultConfig extends AbstractWebSocketMessageBrokerConfigurer {

 @Override
 public void configureMessageBroker(MessageBrokerRegistry config) {
  config.enableSimpleBroker("/topic/", "/queue/");
  config.setApplicationDestinationPrefixes("/app");
 }

 @Override
 public void registerStompEndpoints(StompEndpointRegistry registry) {
  registry.addEndpoint("/chat").withSockJS().setInterceptors(httpSessionIdHandshakeInterceptor());
 }

 @Bean
 public HttpSessionIdHandshakeInterceptor httpSessionIdHandshakeInterceptor() {
  return new HttpSessionIdHandshakeInterceptor();
 }

}

Now that the session id is a part of the STOMP headers, this can be grabbed as a STOMP header, the following is a sample where it is being grabbed when subscriptions are registered to the server:

@Component
public class StompSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {

 private static final Logger logger = LoggerFactory.getLogger(StompSubscribeEventListener.class);

 @Override
 public void onApplicationEvent(SessionSubscribeEvent sessionSubscribeEvent) {
  StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(sessionSubscribeEvent.getMessage());
  logger.info(headerAccessor.getSessionAttributes().get("HTTPSESSIONID").toString());
 }
}

or it can be grabbed from a controller method handling websocket messages as a MessageHeaders parameter:

@MessageMapping("/chats/{chatRoomId}")
 public void handleChat(@Payload ChatMessage message, @DestinationVariable("chatRoomId") String chatRoomId, MessageHeaders messageHeaders, Principal user) {
  logger.info(messageHeaders.toString());
  this.simpMessagingTemplate.convertAndSend("/topic/chats." + chatRoomId, "[" + getTimestamp() + "]:" + user.getName() + ":" + message.getMessage());
 }

Here is a complete working sample which implements this pattern.

Friday, October 31, 2014

Spring Caching abstraction and Google Guava Cache

Spring provides a great out of the box support for caching expensive method calls. The caching abstraction is covered in a great detail here.

My objective here is to cover one of the newer cache implementations that Spring now provides with 4.0+ version of the framework - using Google Guava Cache

In brief, consider a service which has a few slow methods:

public class DummyBookService implements BookService {

 @Override
 public Book loadBook(String isbn) {
  // Slow method 1.

 }

 @Override
 public List<Book> loadBookByAuthor(String author) {
  // Slow method 2
 }

}

With Spring Caching abstraction, repeated calls with the same parameter can be sped up by an annotation on the method along these lines - here the result of loadBook is being cached in to a "book" cache and listing of books cached into another "books" cache:

public class DummyBookService implements BookService {

 @Override
 @Cacheable("book")
 public Book loadBook(String isbn) {
  // slow response time..

 }

 @Override
 @Cacheable("books")
 public List<Book> loadBookByAuthor(String author) {
  // Slow listing
 }
}

Now, Caching abstraction support requires a CacheManager to be available which is responsible for managing the underlying caches to store the cached results, with the new Guava Cache support the CacheManager is along these lines:

@Bean
public CacheManager cacheManager() {
 return new GuavaCacheManager("books", "book");
}

Google Guava Cache provides a rich API to be able to pre-load the cache, set eviction duration based on last access or created time, set the size of the cache etc, if the cache is to be customized then a guava CacheBuilder can be passed to the CacheManager for this customization:

@Bean
public CacheManager cacheManager() {
 GuavaCacheManager guavaCacheManager =  new GuavaCacheManager();
 guavaCacheManager.setCacheBuilder(CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES));
 return guavaCacheManager;
}

This works well if all the caches have a similar configuration, what if the caches need to be configured differently - for eg. in the sample above, I may want the "book" cache to never expire but the "books" cache to have an expiration of 30 mins, then the GuavaCacheManager abstraction does not work well, instead a better solution is actually to use a SimpleCacheManager which provides a more direct way to get to the cache and can be configured this way:

@Bean
public CacheManager cacheManager() {
 SimpleCacheManager simpleCacheManager = new SimpleCacheManager();
 GuavaCache cache1 = new GuavaCache("book", CacheBuilder.newBuilder().build());
 GuavaCache cache2 = new GuavaCache("books", CacheBuilder.newBuilder()
             .expireAfterAccess(30, TimeUnit.MINUTES)
             .build());
 simpleCacheManager.setCaches(Arrays.asList(cache1, cache2));
 return simpleCacheManager;
}

This approach works very nicely, if required certain caches can be configured to be backed by a different caching engines itself, say a simple hashmap, some by Guava or EhCache some by distributed caches like Gemfire.


Wednesday, October 22, 2014

Docker RabbitMQ cluster

I have been trying to create a Docker based RabbitMQ cluster on and off for sometime and got it working today - fairly basic and flaky but could be a good starting point for others to improve on.

This is how the sample cluster looks on my machine, this is a typical cluster described in the RabbitMQ clustering guide available here - https://www.rabbitmq.com/clustering.html. As recommended at the site, there are 2 disk based nodes and 1 RAM based node here.



To quickly replicate this, you only need to have fig in your machine, just create a fig.yml file with the following entry:

rabbit1:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit1
  ports:
    - "5672:5672"
    - "15672:15672"

rabbit2:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit2
  links:
    - rabbit1
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
   - RAM_NODE=true

rabbit3:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit3
  links:
    - rabbit1
    - rabbit2
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   

and in the folder holding this file, run:

  fig up

That is it!, the entire cluster should come up. If you need more nodes, just modify the fig.yml file.

The docker files for creating the dockerized rabbitmq-server is available at my github repo here: https://github.com/bijukunjummen/docker-rabbitmq-cluster and the "rabbitmq-server" image itself is here at the docker hub.

References: