Tuesday, June 23, 2015

Rx-java subscribeOn and observeOn

If you have been confused by Rx-java Observable subscribeOn and observeOn, one of the blog articles that helped me understand these operations is this one by Graham Lea. I wanted to recreate a very small part of the article here, so consider a service which emits values every 200 millseconds:



package obs.threads;

import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class GeneralService {
    private static final Logger logger = LoggerFactory.getLogger(GeneralService.class);
    public Observable<String> getData() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing a Service");
            for (int i = 1; i <= 3; i++) {
                Util.delay(200);
                logger.info("Emitting {}", "root " + i);
                s.onNext("root " + i);
            }
            logger.info("End: Executing a Service");
            s.onCompleted();
        });
    }
}

Now, if I were to subscribe to this service, this way:

@Test
public void testThreadedObservable1() throws Exception {
    Observable<String> ob1 = aService.getData();

    CountDownLatch latch = new CountDownLatch(1);

    ob1.subscribe(s -> {
        Util.delay(500);
        logger.info("Got {}", s);
    }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

    latch.await();
}

All of the emissions and subscriptions will act on the main thread and something along the following lines will be printed:

20:53:29.380 [main] INFO  o.t.GeneralService - Start: Executing a Service
20:53:29.587 [main] INFO  o.t.GeneralService - Emitting root 1
20:53:30.093 [main] INFO  o.t.ThreadedObsTest - Got root 1
20:53:30.298 [main] INFO  o.t.GeneralService - Emitting root 2
20:53:30.800 [main] INFO  o.t.ThreadedObsTest - Got root 2
20:53:31.002 [main] INFO  o.t.GeneralService - Emitting root 3
20:53:31.507 [main] INFO  o.t.ThreadedObsTest - Got root 3
20:53:31.507 [main] INFO  o.t.GeneralService - End: Executing a Service

By default the emissions are not asynchronous in nature. So now, what is the behavior if subscribeOn is used:

public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testSubscribeOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.subscribeOn(Schedulers.from(executor1)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

Here I am using Guava's ThreadFactoryBuilder to give each thread in the threadpool a unique name pattern, if I were to execute this code, the output will be along these lines:

20:56:47.117 [SubscribeOn-0] INFO  o.t.GeneralService - Start: Executing a Service
20:56:47.322 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 1
20:56:47.828 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 1
20:56:48.032 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 2
20:56:48.535 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 2
20:56:48.740 [SubscribeOn-0] INFO  o.t.GeneralService - Emitting root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.ThreadedObsTest - Got root 3
20:56:49.245 [SubscribeOn-0] INFO  o.t.GeneralService - End: Executing a Service

Now, the execution has moved away from the main thread and the emissions and the subscriptions are being processed in the threads borrowed from the threadpool.

And what happens if observeOn is used:
public class ThreadedObsTest {
    private GeneralService aService = new GeneralService();

    private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class);
    private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

 @Test
 public void testObserveOn() throws Exception {
        Observable<String> ob1 = aService.getData();

        CountDownLatch latch = new CountDownLatch(1);

        ob1.observeOn(Schedulers.from(executor2)).subscribe(s -> {
            Util.delay(500);
            logger.info("Got {}", s);
        }, e -> logger.error(e.getMessage(), e), () -> latch.countDown());

        latch.await();
    }
}

the output is along these lines:

21:03:08.655 [main] INFO  o.t.GeneralService - Start: Executing a Service
21:03:08.860 [main] INFO  o.t.GeneralService - Emitting root 1
21:03:09.067 [main] INFO  o.t.GeneralService - Emitting root 2
21:03:09.268 [main] INFO  o.t.GeneralService - Emitting root 3
21:03:09.269 [main] INFO  o.t.GeneralService - End: Executing a Service
21:03:09.366 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 1
21:03:09.872 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 2
21:03:10.376 [ObserveOn-1] INFO  o.t.ThreadedObsTest - Got root 3

The emissions are now back on the main thread but the subscriptions are being processed in a threadpool.

That is the difference, when subscribeOn is used the emissions are performed on the specified Scheduler, when observeOn is used the subscriptions are performed are on the specified scheduler!

And the output when both are specified is equally predictable. Now in all cases I had created a Scheduler using a ThreadPool with 5 threads but only 1 of the threads has really been used both for emitting values and for processing subscriptions, this is actually the normal behavior of Observables. If you want to make more efficient use of the Threadpool, one approach may be to create multiple Observable's, say for eg, if I have a service which returns pages of data this way:

public Observable<Integer> getPages(int totalPages) {
    return Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            logger.info("Getting pages");
            for (int i = 1; i <= totalPages; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
}

and another service which acts on each page of the data:

public Observable<String> actOnAPage(int pageNum) {
    return Observable.<String>create(s -> {
        Util.delay(200);
        logger.info("Acting on page {}",  pageNum);
        s.onNext("Page " + pageNum);
        s.onCompleted();
    });
}

a way to use a Threadpool to process each page of data would be to chain it this way:

getPages(5).flatMap(  page -> aService.actOnAPage(page).subscribeOn(Schedulers.from(executor1)) )
                .subscribe(s -> {
                    logger.info("Completed Processing page: {}", s);
        });

see how the subscribeOn is on the each Observable acting on a page. With this change, the output would look like this:

21:15:45.572 [main] INFO  o.t.ThreadedObsTest - Getting pages
21:15:45.787 [SubscribeOn-1] INFO  o.t.GeneralService - Acting on page 2
21:15:45.787 [SubscribeOn-0] INFO  o.t.GeneralService - Acting on page 1
21:15:45.787 [SubscribeOn-4] INFO  o.t.GeneralService - Acting on page 5
21:15:45.787 [SubscribeOn-3] INFO  o.t.GeneralService - Acting on page 4
21:15:45.787 [SubscribeOn-2] INFO  o.t.GeneralService - Acting on page 3
21:15:45.789 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 2
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 1
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 3
21:15:45.790 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 4
21:15:45.791 [SubscribeOn-1] INFO  o.t.ThreadedObsTest - Completed Processing page: Page 5

Now the threads in the threadpool are being used uniformly.

2 comments:

  1. Hey, thanks for the post, it's really useful.
    Could you upload and link the project in Github to test other use case ?

    ReplyDelete