Thursday, January 9, 2014

Spring Integration Publisher

Consider a hypothetical requirement - You have a service class in your application and you want to capture some information around the calls to this service:

public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);

 public Response call(Request request) {" invoked");
  return new Response(true);

AOP is a great fit for such a requirement, it allows the information around a method call(a pointcut) to be cleanly captured and some processing(an advice) to be done with this information:

public class AuditAspect {
 private static final Logger logger = LoggerFactory.getLogger(AuditAspect.class);
 @Pointcut("execution( * && args(r)")
 public void beanCalls(Request r){}

 public Object auditCalls(ProceedingJoinPoint pjp, Request r) {"Capturing request: " + r);
   Object response = pjp.proceed();"Capturing response: " + response);
   return response;
  }catch(Throwable e) {
   throw new RuntimeException(e);

This appears to be good enough. Now what if I wanted to return the response back to the client immediately but continue to process the context of the method call - well we can place the logic of Advice in a separate thread using a ThreadPool. Let me add another layer of complexity now, what if we wanted to absolutely ensure that context is not lost - a good way to do this would be to keep the context of the method call outside the JVM, typically messaging providers like RabbitMQ and ActiveMQ will fit in very well.

Considering these additional requirements, a simpler solution especially with messaging scenarios coming into play will be to use Spring Integration. Let us start by defining a new Spring Integration application context:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns=""


    <channel id="tobeprocessedlater"/>

    <logging-channel-adapter channel="tobeprocessedlater" log-full-message="true"/>


It just has the definition of a channel, and an outbound adapter which reads from this channel and logs the full message. To capture the context of the call to the SampleBean a Publisher annotation can be added to the relevant methods of SampleBean which will direct "stuff" to the channel which is added to the annotation.

public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);

 @Publisher(channel = "tobeprocessedlater")
 public Response call(@Header("request") Request request) {" invoked");
  return new Response(true);

what "stuff" is sent to this "tobeprocessedlater" channel is specified through additional annotations - by default the return value from the method is sent to the channel, additionally I have tagged the request also with the @Header annotation, this will make the request to be sent in as a header to the response message. Just for completeness, the integration context has a <annotation-config/> tag, this tag registers the relevant components that look for @Publisher annotation and weave in the additional action to be performed if it finds one.

If this code is executed now, the output will be along these lines:

                       core.SampleBean - invoked
o.s.integration.handler.LoggingHandler - [Payload=Response{success=true}][Headers={request=RequestType1{attr='null'}, id=52997b10-dc8e-e0eb-a82a-88c1df68fca5, timestamp=1389268390212}]

Now, to layer in the first requirement, to handle the advice(in this case the logging) in a separate thread of execution:

This can be done with just a configuration change! - instead of publishing the message to a direct channel, publish it to a channel type that can buffer messages or use an executor to dispatch messages, I have opted to use an executor channel in this example:

    <channel id="tobeprocessedlater">
        <dispatcher task-executor="taskExecutor"/>

Now to add the requirement to make the async message processing a little more reliable by publishing it to an external Messaging provider(and processing the messages later), let me demonstrate this by publishing the messages to RabbitMQ, the code change again is pure configuration and nothing in the code changes!:

    <channel id="tobeprocessedlater"/>

    <int-amqp:outbound-channel-adapter amqp-template="amqpTemplate" channel="tobeprocessedlater"  />

The messaging sink could have been anything - a database, a file system, ActiveMQ, and the change that would have been required is pure configuration.

No comments:

Post a Comment