Saturday, March 14, 2015

Using rx-java Observable in a Spring MVC flow

Spring MVC has supported asynchronous request processing flow for sometime now and this support internally utilizes the Servlet 3 async support of containers like Tomcat/Jetty.

Spring Web Async support

Consider a service call that takes a little while to process, simulated with a delay:

public CompletableFuture<Message> getAMessageFuture() {
    return CompletableFuture.supplyAsync(() -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        logger.info("End: Executing slow task in Service 1");
        return new Message("data 1");
    }, futureExecutor);
}

If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:

@RequestMapping("/getAMessageFutureBlocking")
public Message getAMessageFutureBlocking() throws Exception {
    return service1.getAMessageFuture().get();
}

A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:

@RequestMapping("/getAMessageFutureAsync")
public DeferredResult<Message> getAMessageFutureAsync() {
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    CompletableFuture<Message> f = this.service1.getAMessageFuture();
    f.whenComplete((res, ex) -> {
        if (ex != null) {
            deffered.setErrorResult(ex);
        } else {
            deffered.setResult(res);
        }
    });
    return deffered;
}

Using Observable in a Async Flow


Now to the topic of this article, I have been using Rx-java's excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.

Consider the service that was described above now modified to return an Observable:

public Observable<Message> getAMessageObs() {
    return Observable.<Message>create(s -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        s.onNext(new Message("data 1"));
        logger.info("End: Executing slow task in Service 1");
        s.onCompleted();
    }).subscribeOn(Schedulers.from(customObservableExecutor));
}

I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:

@RequestMapping("/getAMessageObsBlocking")
public Message getAMessageObsBlocking() {
    return service1.getAMessageObs().toBlocking().first();
}

To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring's DeferredResult type:

@RequestMapping("/getAMessageObsAsync")
public DeferredResult<Message> getAMessageAsync() {
    Observable<Message> o = this.service1.getAMessageObs();
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e));
    return deffered;
}

This would ensure that the thread handling the user flow would return as soon as the service call is complete and the user response will be processed reactively once the observable starts emitting values.


If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.

References:

Spring's reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-async

More details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog - http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html

Saturday, March 7, 2015

Netflix Archaius properties in a Spring project

Archaius Basics


Netflix Archaius is a library for managing configuration for an application. Consider a properties file "sample.properties" holding a property called "myprop":

myprop=myprop_value_default

This is how the file is loaded up using Archaius:

ConfigurationManager
                .loadCascadedPropertiesFromResources("sample");

String myProp = DynamicPropertyFactory.getInstance().getStringProperty("myprop", "NOT FOUND").get();

assertThat(myProp, equalTo("myprop_value_default"));

Archaius can load property appropriate to an environment, consider that there is a "sample-perf.properties" with the same configuration over-ridden for perf environment:


myprop=myprop_value_perf

Now Archaius can be instructed to load the configuration in a cascaded way by adding the following in sample.properties file:
myprop=myprop_value_default

@next=sample-${@environment}.properties

And the test would look like this:

ConfigurationManager.getDeploymentContext().setDeploymentEnvironment("perf");
ConfigurationManager
        .loadCascadedPropertiesFromResources("sample");

String myProp = DynamicPropertyFactory.getInstance().getStringProperty("myprop", "NOT FOUND").get();

assertThat(myProp, equalTo("myprop_value_perf"));

Spring Property basics


Spring property basics are very well explained at the Spring Framework reference site here. In short, if there is a property file "sample.properties", it can be loaded up and referenced the following way:

@Configuration
@PropertySource("classpath:/sample.properties")
public class AppConfig {
    @Autowired
    Environment env;

    @Bean
    public TestBean testBean() {
        TestBean testBean = new TestBean();
        testBean.setName(env.getProperty("myprop"));
        return testBean;
    }


}

Or even simpler, they can be de-referenced with placeholders this way:
@Configuration
@PropertySource("classpath:/sample.properties")
public class AppConfig {
    @Value("${myprop}")
    private String myProp;

    @Bean
    public TestBean testBean() {
        TestBean testBean = new TestBean();
        testBean.setName(myProp));
        return testBean;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}


Making Archaius properties visible to Spring


So now the question is how to get the Archaius properties visible in Spring, the approach I have taken is a little quick and dirty one but can be cleaned up to suite your needs. My approach is to define a Spring PropertySource which internally delegates to Archaius:

import com.netflix.config.ConfigurationManager;
import com.netflix.config.DynamicPropertyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.PropertySource;

import java.io.IOException;

public class SpringArchaiusPropertySource extends PropertySource<Void> {


    private static final Logger LOGGER = LoggerFactory.getLogger(SpringArchaiusPropertySource.class);


    public SpringArchaiusPropertySource(String name) {
        super(name);
        try {
            ConfigurationManager
                    .loadCascadedPropertiesFromResources(name);
        } catch (IOException e) {
            LOGGER.warn(
                    "Cannot find the properties specified : {}", name);
        }

    }

    @Override
    public Object getProperty(String name) {
         return DynamicPropertyFactory.getInstance().getStringProperty(name, null).get();
    }
}

The tricky part is registering this new PropertySource with Spring, this can be done using an ApplicationContextInitializer which is triggered before the application context is initialized:

import com.netflix.config.ConfigurationBasedDeploymentContext;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.util.StringUtils;

public class SpringProfileSettingApplicationContextInitializer
        implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    public void initialize(ConfigurableApplicationContext ctx) {
        ctx.getEnvironment()
                .getPropertySources()
                .addFirst(new SpringArchaiusPropertySource("samples"));
    }
}

And finally registering this new ApplicationContextInitializer with Spring is described here

This is essentially it, now the Netflix Archaius properties should work in a Spring application.

Sunday, March 1, 2015

Java 8 Stream to Rx-Java Observable

I was recently looking at a way to convert a Java 8 Stream to Rx-Java Observable.

There is one api in Observable that appears to do this :

public static final <T> Observable<T> from(java.lang.Iterable<? extends T> iterable)

So now the question is how do we transform a Stream to an Iterable. Stream does not implement the Iterable interface, and there are good reasons for this. So to return an Iterable from a Stream, you can do the following:

Iterable iterable = new Iterable() {
    @Override
    public Iterator iterator() {
        return aStream.iterator();
    }
};

Observable.from(iterable);

Since Iterable is a Java 8 functional interface, this can be simplified to the following using Java 8 Lambda expressions!:

Observable.from(aStream::iterator);

First look it does appear cryptic, however if it is seen as a way to simplify the expanded form of Iterable then it slowly starts to make sense.

Reference:
This is entirely based on what I read on this Stackoverflow question.

Sunday, February 22, 2015

Async abstractions using rx-java

One of the big benefits in using Rx-java for me has been the way the code looks exactly the same whether the underlying calls are synchronous or asynchronous and hence the title of this entry.

Consider a very simple use case of a client code making three slow running calls and combines the results into a list:

String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)

Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:

public String operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return "operation1"
}

So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type Observable, a bad implementation would look like this:

public Observable<string> operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return Observable.just("operation 1");
}

So with this the caller implementation changes to the following:

Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();

Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();


See how the caller composes the results using the merge method.

However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:

public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(7000);
            s.onNext("operation 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}

subscribeOn uses the specified Scheduler to run the actual operation.

The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further, here is a github repo with working examples.

Thursday, February 12, 2015

Standing up a local Netflix Eureka

Here I will consider two different ways of standing up a local instance of Netflix Eureka. If you are not familiar with Eureka, it provides a central registry where (micro)services can register themselves and client applications can use this registry to look up specific instances hosting a service and to make the service calls.

Approach 1: Native Eureka Library

The first way is to simply use the archive file generated by the Netflix Eureka build process:

1. Clone the Eureka source repository here: https://github.com/Netflix/eureka
2. Run "./gradlew build" at the root of the repository, this should build cleanly generating a war file in eureka-server/build/libs folder
3. Grab this file, rename it to "eureka.war" and place it in the webapps folder of either tomcat or jetty. For this exercise I have used jetty.
4. Start jetty, by default jetty will boot up at port 8080, however I wanted to instead bring it up at port 8761, so you can start it up this way, "java -jar start.jar -Djetty.port=8761"

The server should start up cleanly and can be verified at this endpoint - "http://localhost:8761/eureka/v2/apps"


Approach 2: Spring-Cloud-Netflix


Spring-Cloud-Netflix provides a very neat way to bootstrap Eureka. To bring up Eureka server using Spring-Cloud-Netflix the approach that I followed was to clone the sample Eureka server application available here: https://github.com/spring-cloud-samples/eureka

1. Clone this repository
2. From the root of the repository run "mvn spring-boot:run", and that is it!.

The server should boot up cleanly and the REST endpoint should come up here: "http://localhost:8761/eureka/apps". As a bonus, Spring-Cloud-Netflix provides a neat UI showing the various applications who have registered with Eureka at the root of the webapp at "http://localhost:8761/".

Just a few small issues to be aware of, note that the context url's are a little different in the two cases "eureka/v2/apps" vs "eureka/apps", this can be adjusted on the configurations of the services which register with Eureka.

Conclusion


Your mileage with these approaches may vary. I have found Spring-Cloud-Netflix a little unstable at times but it has mostly worked out well for me. The documentation at the Spring-Cloud site is also far more exhaustive than the one provided at the Netflix Eureka site.

Thursday, February 5, 2015

Netflix Governator Tests - Introducing governator-junit-runner

Consider a typical Netflix Governator junit test.

public class SampleWithGovernatorJunitSupportTest {

    @Rule
    public LifecycleTester tester = new LifecycleTester();

    @Test
    public void testExampleBeanInjection() throws Exception {
        tester.start();
        Injector injector = tester
                .builder()
                .withBootstrapModule(new SampleBootstrapModule())
                .withModuleClass(SampleModule.class)
                .usingBasePackages("sample.gov")
                .build()
                .createInjector();

        BlogService blogService = injector.getInstance(BlogService.class);
        assertThat(blogService.get(1l), is(notNullValue()));
        assertThat(blogService.getBlogServiceName(), equalTo("Test Blog Service"));
    }

}

This test is leveraging the Junit rule support provided by Netflix Governator and tests some of the feature sets of Governator - Bootstrap modules, package scanning, configuration support etc.

The test however has quite a lot of boilerplate code which I felt could be reduced by instead leveraging a Junit Runner type model. As a proof of this concept, I am introducing the unimaginatively named project - governator-junit-runner, consider now the same test re-written using this library:

@RunWith(GovernatorJunit4Runner.class)
@LifecycleInjectorParams(modules = SampleModule.class, bootstrapModule = SampleBootstrapModule.class, scannedPackages = "sample.gov")
public class SampleGovernatorRunnerTest {

    @Inject
    private BlogService blogService;

    @Test
    public void testExampleBeanInjection() throws Exception {
        assertNotNull(blogService.get(1l));
        assertEquals("Test Blog Service", blogService.getBlogServiceName());
    }

}

Most of the boilerplate is now implemented within the Junit runner and the parameters required to bootstrap Governator is passed in through the LifecycleInjectorParams annotation. The test instance itself is a bound component and thus can be injected into, this way the instances which need to be tested can be injected into the test itself and asserted on. If you want more fine-grained control, the LifecycleManager itself can be injected into the test!:

@Inject
private Injector injector;

@Inject
private LifecycleManager lifecycleManager;

If this interests you, more samples are at the project site here.

Sunday, February 1, 2015

Disambiguating between instances with Google Guice

Google guice provides a neat way to select a target implementation if there are multiple implementations of an interface. My samples are based on an excellent article by Josh Long(@starbuxman) on a similar mechanism that Spring provides.

So, consider an interface called MarketPlace having two implementations, an AndroidMarketPlace and AppleMarketPlace:

interface MarketPlace {
}

class AppleMarketPlace implements MarketPlace {

    @Override
    public String toString() {
        return "apple";
    }
}

class GoogleMarketPlace implements MarketPlace {

    @Override
    public String toString() {
        return "android";
    }
}

and consider a user of these implementations:

class MarketPlaceUser {
    private final MarketPlace marketPlace;
    
    public MarketPlaceUser(MarketPlace marketPlace) {
        System.out.println("MarketPlaceUser constructor called..");
        this.marketPlace = marketPlace;
    }

    public String showMarketPlace() {
        return this.marketPlace.toString();
    }

}

A good way for MarketPlaceUser to disambiguate between these implementations is to use a guice feature called Binding Annotations. To make use of this feature, start by defining annotations for each of these implementations this way:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
@BindingAnnotation
@interface Android {}

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
@BindingAnnotation
@interface Ios {}

and inform the Guice binder about these annotations and the appropriate implementation corresponding to the annotation:

class MultipleInstancesModule extends AbstractModule {

    @Override
    protected void configure() {
        bind(MarketPlace.class).annotatedWith(Ios.class).to(AppleMarketPlace.class).in(Scopes.SINGLETON);
        bind(MarketPlace.class).annotatedWith(Android.class).to(GoogleMarketPlace.class).in(Scopes.SINGLETON);
        bind(MarketPlaceUser.class).in(Scopes.SINGLETON);
    }
}

Now, if MarketPlaceUser needs to use one or the other implementation, this is how the dependency can be injected in:

import com.google.inject.*;

class MarketPlaceUser {
    private final MarketPlace marketPlace;

    @Inject
    public MarketPlaceUser(@Ios MarketPlace marketPlace) {
        this.marketPlace = marketPlace;
    }

}

This is very intuitive. If you have concerns about defining so many annotations, another approach could be to use @Named built-in Google Guice annotation, this way:

class MultipleInstancesModule extends AbstractModule {

    @Override
    protected void configure() {
        bind(MarketPlace.class).annotatedWith(Names.named("ios")).to(AppleMarketPlace.class).in(Scopes.SINGLETON);
        bind(MarketPlace.class).annotatedWith(Names.named("android")).to(GoogleMarketPlace.class).in(Scopes.SINGLETON);
        bind(MarketPlaceUser.class).in(Scopes.SINGLETON);
    }
}

and use it this way, where the dependency is required:

import com.google.inject.*;

class MarketPlaceUser {
    private final MarketPlace marketPlace;

    @Inject
    public MarketPlaceUser(@Named("ios") MarketPlace marketPlace) {
        this.marketPlace = marketPlace;
    }

}

If you are interested in exploring this further, here is the Google guice sample and an equivalent sample using Spring framework