Thursday, April 23, 2015

Spring-session demonstration using docker-compose

I have earlier written about an exciting new project called Spring-session which provides a clean way to externalize user sessions for java based web applications.

I managed to get a good demonstration set-up for spring-session using docker-compose which shows off the strengths of this project and I wanted to write about this here. In short, this is the set-up that running docker-compose will bring up:


Two instances of the application which makes use of Spring-session are started up, these instances use the same redis container for storing the session state and are in turn fronted by an nginx server.

All that needs to be done to bring up this topology is to:

  • clone my repo available here
  • install docker-compose 
  • build the app - "mvn package -DskipTests" - skipping tests as the tests depend on a local redis-server, which may or may not be available
  • run "docker-compose up" in the cloned folder
That is it, if everything has been set-up cleanly nginx should be available at http://docker-ip  url - in my mac, it is typically http://192.168.59.103

Details and Demonstration:
Docker-compose is a tool providing a means to put together a set of docker containers into a coherent stack. The stack can be defined declaratively, and the following is a sample stack used here:

nginx:
  image: nginx
  volumes:
    - nginx:/etc/nginx:ro
  links:
    - shop1
    - shop2
  ports:
   - "80:80"

shop1:
  build: .
  hostname: shop1
  links:
    - redis
  ports:
    - "8081:8080"

shop2:
  build: .
  hostname: shop2
  links:
    - redis
  ports:
    - "8082:8080"

redis:
  image: redis
  hostname: redis
  ports:
    - "6379:6379"


This application itself makes use of the user session to maintain the state of a "shopping cart", since this application is configured to use spring-session the session will be maintained in redis database. There are two instances of the application behind nginx, either one of the servers would end up getting the request, but the externalized session state would continue to work seamlessly irrespective of the application instance handling the request.

The following is the view of the shopping cart:


The session id and the details of the instance handling the request are printed at the bottom of the page.

As can be seen in the following screenshot, even if a different instance handles the request, the session state continues to be cleanly maintained.


Wednesday, April 8, 2015

Spring Enable* annotation - writing a custom Enable annotation

Spring provides a range of annotations with names starting with Enable*, these annotations in essence enable certain Spring managed features to be activated. One good example of such an annotation is EnableWebMvc which brings in all the beans needed to support a MVC flow in Spring based applications. Another good example is the EnableAsync annotation to activate beans to support async functionality in Spring based applications.

I was curious about how such annotations work and wanted to document my understanding. The way these annotations are supported can be considered part of the SPI and so may break if the internal implementation changes in future.

Simple Enable* Annotations

One way to think about these custom annotations is that they add a set of new beans into the Spring's application context. Let us start by defining one such custom annotation:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface EnableSomeBeans {}



and apply this annotation on a Spring @Configuration class:

@Configuration
@EnableSomeBeans
public static class SpringConfig {}

So now to bring in a set of beans when this annotation is applied is as simple as adding the set of beans to bring in using @Import annotation this way:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(SomeBeanConfiguration.class)
@interface EnableSomeBeans {}

That is essentially it, if this imported @Configuration class defines any beans, they would now be part of the Application context:

@Configuration
class SomeBeanConfiguration {

    @Bean
    public String aBean1() {
        return "aBean1";
    }

    @Bean
    public String aBean2() {
        return "aBean2";
    }
}

Here is a gist with a working sample.

Enable* Annotations with Selectors


Enable annotations can be far more complex though, they can activate a different family of beans based on the context around them. An example of such an annotation is EnableCaching which activates configuration based on different caching implementations available in the classpath.

Writing such Enable* annotations is a little more involved than the simpler example earlier. As before start with a custom annotation:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(SomeBeanConfigurationSelector.class)
public @interface EnableSomeBeansSelector {
    String criteria() default "default";
}

Note that in this case the custom annotation has a sample field called criteria, what I want to do is to activate two different set of beans based on this criteria. This can be achieved using a @Configuration selector which can return different @Configuration file based on the context(in this instance the value of the criteria field). This selector has a simple signature and this is a sample implementation:

import org.springframework.context.annotation.ImportSelector;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.type.AnnotationMetadata;

public class SomeBeanConfigurationSelector implements ImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        AnnotationAttributes attributes =
                AnnotationAttributes.fromMap(
                        importingClassMetadata.getAnnotationAttributes(EnableSomeBeansSelector.class.getName(), false));
        String criteria = attributes.getString("criteria");
        if (criteria.equals("default")) {
            return new String[]{"enableannot.selector.SomeBeanConfigurationDefault"};
        }else {
            return new String[]{"enableannot.selector.SomeBeanConfigurationType1"};
        }
    }
}

@Configuration
class SomeBeanConfigurationType1 {

    @Bean
    public String aBean() {
        return "Type1";
    }

}

@Configuration
class SomeBeanConfigurationDefault {

    @Bean
    public String aBean() {
        return "Default";
    }

}

So if the criteria field is "default", the beans in "SomeBeanConfigurationDefault" gets added in, else the one in "SomeBeanConfigurationType1"

Here is a gist with a working sample.

Conclusion

I hope this gives an appreciation for how Spring internally implements the @Enable* annotations, as an application developer you may not need to create such annotations yourself, a simpler mechanism will be to use @Configuration classes and Spring bean profiles to compose applications.

Friday, March 27, 2015

Hot and cold rx-java Observable

My own understanding of Hot and Cold Observable is quite shaky, but here is what I have understood till now!

Cold Observable

Consider an API which returns an rx-java Observable:

import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(1000);
            s.onNext("data 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}

Now, the first thing to note is that the typical Observable does not do anything until it is subscribed to:

So essentially if I were to do this:

Observable<String> op1 = service1.operation();

Nothing would be printed or returned, unless there is a subscription on the Observable this way:

Observable<String> op1 = service1.operation();

CountDownLatch latch = new CountDownLatch(1);

op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

latch.await();

So now, what happens if there are multiple subscriptions on this Observable:

Observable<String> op1 = service1.operation();

CountDownLatch latch = new CountDownLatch(3);

op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

latch.await();

With a cold observable the code would get called once more and the items emitted again, I get this on my machine:

06:04:07.206 [RxComputationThreadPool-2] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:07.208 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:08.211 [RxComputationThreadPool-2] INFO  o.b.BasicObservablesTest - From Subscriber 2: data 1
06:04:08.211 [RxComputationThreadPool-1] INFO  o.b.BasicObservablesTest - From Subscriber 1: data 1
06:04:08.211 [RxComputationThreadPool-3] INFO  o.b.BasicObservablesTest - From Subscriber 3: data 1
06:04:08.213 [RxComputationThreadPool-2] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-1] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

Hot Observable - using ConnectableObservable


Hot Observable on the other hand does not really need a subscription to start emitting items. A way to implement a Hot Observable is using a ConnectableObservable, which is a Observable which does not emit items until its connect method is called, however once it starts emitting items, any subscriber to it gets items only from the point of subscription. So again revisiting the previous example, but with a ConnectableObservable instead:

Observable<String> op1 = service1.operation();

ConnectableObservable<String> connectableObservable =  op1.publish();

CountDownLatch latch = new CountDownLatch(3);

connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

connectableObservable.connect();

latch.await();

and the following gets printed:
06:07:23.852 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:07:24.860 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 1: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 2: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 3: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

Hot Observable - using Subject

Another way to convert a cold Observable to a hot one is to use a Subject. Subjects behave both as an Observable and an Observer, there are different types of Subjects available with different behavior. Here I am using a Subject called a PublishSubject which has a Pub/Sub behavior - the items get emitted to all the subscribers listening on it. So with a PublishSubject introduced the code looks like this:

Observable<String> op1 = service1.operation();

PublishSubject<String> publishSubject = PublishSubject.create();

op1.subscribe(publishSubject);

CountDownLatch latch = new CountDownLatch(3);

publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());

publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());


latch.await();

See how the PublishSubject is introduced as a subscriber to the Observable and the other subscribers subscribe to the PublishSubject instead. The output will be similar to the one from ConnectableObservable.

This is essentially it, the extent of my understanding of Hot Observable. So to conclude, the difference between a Cold and a Hot Observable is about when the subscribers get the emitted items and when the items are emitted - with a Cold Observable they are emitted when they are subscribed to and typically get all the emitted items, with a Hot Observable the items are emitted without a Subscriber and subscribers get items emitted after the point of subscription typically.


Reference

1. http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
2. Excellent javadoc on rx-java - http://reactivex.io/RxJava/javadoc/index.html

Saturday, March 14, 2015

Using rx-java Observable in a Spring MVC flow

Spring MVC has supported asynchronous request processing flow for sometime now and this support internally utilizes the Servlet 3 async support of containers like Tomcat/Jetty.

Spring Web Async support

Consider a service call that takes a little while to process, simulated with a delay:

public CompletableFuture<Message> getAMessageFuture() {
    return CompletableFuture.supplyAsync(() -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        logger.info("End: Executing slow task in Service 1");
        return new Message("data 1");
    }, futureExecutor);
}

If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:

@RequestMapping("/getAMessageFutureBlocking")
public Message getAMessageFutureBlocking() throws Exception {
    return service1.getAMessageFuture().get();
}

A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:

@RequestMapping("/getAMessageFutureAsync")
public DeferredResult<Message> getAMessageFutureAsync() {
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    CompletableFuture<Message> f = this.service1.getAMessageFuture();
    f.whenComplete((res, ex) -> {
        if (ex != null) {
            deffered.setErrorResult(ex);
        } else {
            deffered.setResult(res);
        }
    });
    return deffered;
}

Using Observable in a Async Flow


Now to the topic of this article, I have been using Rx-java's excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.

Consider the service that was described above now modified to return an Observable:

public Observable<Message> getAMessageObs() {
    return Observable.<Message>create(s -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        s.onNext(new Message("data 1"));
        logger.info("End: Executing slow task in Service 1");
        s.onCompleted();
    }).subscribeOn(Schedulers.from(customObservableExecutor));
}

I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:

@RequestMapping("/getAMessageObsBlocking")
public Message getAMessageObsBlocking() {
    return service1.getAMessageObs().toBlocking().first();
}

To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring's DeferredResult type:

@RequestMapping("/getAMessageObsAsync")
public DeferredResult<Message> getAMessageAsync() {
    Observable<Message> o = this.service1.getAMessageObs();
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e));
    return deffered;
}

This would ensure that the thread handling the user flow would return as soon as the service call is complete and the user response will be processed reactively once the observable starts emitting values.


If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.

References:

Spring's reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-async

More details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog - http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html

Saturday, March 7, 2015

Netflix Archaius properties in a Spring project

Archaius Basics


Netflix Archaius is a library for managing configuration for an application. Consider a properties file "sample.properties" holding a property called "myprop":

myprop=myprop_value_default

This is how the file is loaded up using Archaius:

ConfigurationManager
                .loadCascadedPropertiesFromResources("sample");

String myProp = DynamicPropertyFactory.getInstance().getStringProperty("myprop", "NOT FOUND").get();

assertThat(myProp, equalTo("myprop_value_default"));

Archaius can load property appropriate to an environment, consider that there is a "sample-perf.properties" with the same configuration over-ridden for perf environment:


myprop=myprop_value_perf

Now Archaius can be instructed to load the configuration in a cascaded way by adding the following in sample.properties file:
myprop=myprop_value_default

@next=sample-${@environment}.properties

And the test would look like this:

ConfigurationManager.getDeploymentContext().setDeploymentEnvironment("perf");
ConfigurationManager
        .loadCascadedPropertiesFromResources("sample");

String myProp = DynamicPropertyFactory.getInstance().getStringProperty("myprop", "NOT FOUND").get();

assertThat(myProp, equalTo("myprop_value_perf"));

Spring Property basics


Spring property basics are very well explained at the Spring Framework reference site here. In short, if there is a property file "sample.properties", it can be loaded up and referenced the following way:

@Configuration
@PropertySource("classpath:/sample.properties")
public class AppConfig {
    @Autowired
    Environment env;

    @Bean
    public TestBean testBean() {
        TestBean testBean = new TestBean();
        testBean.setName(env.getProperty("myprop"));
        return testBean;
    }


}

Or even simpler, they can be de-referenced with placeholders this way:
@Configuration
@PropertySource("classpath:/sample.properties")
public class AppConfig {
    @Value("${myprop}")
    private String myProp;

    @Bean
    public TestBean testBean() {
        TestBean testBean = new TestBean();
        testBean.setName(myProp));
        return testBean;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}


Making Archaius properties visible to Spring


So now the question is how to get the Archaius properties visible in Spring, the approach I have taken is a little quick and dirty one but can be cleaned up to suite your needs. My approach is to define a Spring PropertySource which internally delegates to Archaius:

import com.netflix.config.ConfigurationManager;
import com.netflix.config.DynamicPropertyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.PropertySource;

import java.io.IOException;

public class SpringArchaiusPropertySource extends PropertySource<Void> {


    private static final Logger LOGGER = LoggerFactory.getLogger(SpringArchaiusPropertySource.class);


    public SpringArchaiusPropertySource(String name) {
        super(name);
        try {
            ConfigurationManager
                    .loadCascadedPropertiesFromResources(name);
        } catch (IOException e) {
            LOGGER.warn(
                    "Cannot find the properties specified : {}", name);
        }

    }

    @Override
    public Object getProperty(String name) {
         return DynamicPropertyFactory.getInstance().getStringProperty(name, null).get();
    }
}

The tricky part is registering this new PropertySource with Spring, this can be done using an ApplicationContextInitializer which is triggered before the application context is initialized:

import com.netflix.config.ConfigurationBasedDeploymentContext;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.StringUtils;

public class SpringProfileSettingApplicationContextInitializer
        implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    public void initialize(ConfigurableApplicationContext ctx) {
        ctx.getEnvironment()
                .getPropertySources()
                .addFirst(new SpringArchaiusPropertySource("samples"));
    }
}

And finally registering this new ApplicationContextInitializer with Spring is described here

This is essentially it, now the Netflix Archaius properties should work in a Spring application.

Sunday, March 1, 2015

Java 8 Stream to Rx-Java Observable

I was recently looking at a way to convert a Java 8 Stream to Rx-Java Observable.

There is one api in Observable that appears to do this :

public static final <T> Observable<T> from(java.lang.Iterable<? extends T> iterable)

So now the question is how do we transform a Stream to an Iterable. Stream does not implement the Iterable interface, and there are good reasons for this. So to return an Iterable from a Stream, you can do the following:

Iterable iterable = new Iterable() {
    @Override
    public Iterator iterator() {
        return aStream.iterator();
    }
};

Observable.from(iterable);

Since Iterable is a Java 8 functional interface, this can be simplified to the following using Java 8 Lambda expressions!:

Observable.from(aStream::iterator);

First look it does appear cryptic, however if it is seen as a way to simplify the expanded form of Iterable then it slowly starts to make sense.

Reference:
This is entirely based on what I read on this Stackoverflow question.

Sunday, February 22, 2015

Async abstractions using rx-java

One of the big benefits in using Rx-java for me has been the way the code looks exactly the same whether the underlying calls are synchronous or asynchronous and hence the title of this entry.

Consider a very simple use case of a client code making three slow running calls and combines the results into a list:

String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)

Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:

public String operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return "operation1"
}

So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type Observable, a bad implementation would look like this:

public Observable<string> operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return Observable.just("operation 1");
}

So with this the caller implementation changes to the following:

Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();

Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();


See how the caller composes the results using the merge method.

However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:

public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(7000);
            s.onNext("operation 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}

subscribeOn uses the specified Scheduler to run the actual operation.

The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further, here is a github repo with working examples.