Sunday, February 22, 2015

Async abstractions using rx-java

One of the big benefits in using Rx-java for me has been the way the code looks exactly the same whether the underlying calls are synchronous or asynchronous and hence the title of this entry.

Consider a very simple use case of a client code making three slow running calls and combines the results into a list:

String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)

Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:

public String operation() {"Start: Executing slow task in Service 1");
    Util.delay(7000);"End: Executing slow task in Service 1");
    return "operation1"

So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type Observable, a bad implementation would look like this:

public Observable<string> operation() {"Start: Executing slow task in Service 1");
    Util.delay(7000);"End: Executing slow task in Service 1");
    return Observable.just("operation 1");

So with this the caller implementation changes to the following:

Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();

Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();

See how the caller composes the results using the merge method.

However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:

public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
  "Start: Executing slow task in Service 1");
            s.onNext("operation 1");
  "End: Executing slow task in Service 1");

subscribeOn uses the specified Scheduler to run the actual operation.

The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further, here is a github repo with working examples.


  1. Thank you for the attempt, keep up the great
    work Great work.

  2. hmmm, that's not working for me with 'Util.delay(7000)' line.
    Asynch 'operation()' executions are started but not completed :(

    Works ok if delay is removed or reduced to 10 milisec, only.
    But what is a reason, any idea?
    Is it due to any resources settings?