Tuesday, November 29, 2016

Using Kafka with Junit

One of the neat features that the excellent Spring Kafka project provides, apart from a easier to use abstraction over raw Kafka Producer and Consumer, is a way to use Kafka in tests. It does this by providing an embedded version of Kafka that can be set-up and torn down very easily.

All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.

With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:

public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.

Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
    } finally {

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

A little more comprehensive test is available here

Tuesday, November 22, 2016

Recipe for getting started with Spring Boot and Angular 2

I am primarily a service developer who has to create some passable UI's once in a while. I was adept at basic AngularJS1 based UI's and could get stuff done by using an approach that I have outlined before. With the advent of Angular 2 I had to unfortunately throw my previous approach out of the window and now have an approach with Spring Boot/ Angular 2 that works equally well.

The approach essentially works on the fact that a Spring Boot web application looks for static content in a very specific location - src/main/resources/static folder from the root of the project, so if I can get the final js content into this folder, then I am golden.

So let us jump into it.


There is primarily one pre-requisite - the excellent angular-cli tool which is a blessing for UI ignorant developers like me.

The second optional but useful pre-requisite is the Spring-Boot CLI tool described here

Generating a SPA Project

Given these two tools, first create a Spring Boot web project either by starting from http://start.spring.io or using the following CLI command:

spring init --dependencies=web spring-boot-angular2-static-sample

At this point a starter project should have been generated in the spring-boot-angular2-static-sample folder. From that folder generate a Angular 2 project using the angular-cli.

ng init

Change the location where angular-cli builds the artifacts, edit angular-cli.json and modify as follows:

Now build the static content:

ng build

this should get the static content to the src/main/resources/static folder.

And start up the Spring-Boot app:

mvn spring-boot:run

and the AngularJS2 based UI should render cleanly!

Live Reload

One of the advantages of using the Angular-cli is the excellent tool-chain that it comes with - one of them being the ability to make changes and view it reflected on the UI. This ability is lost with the approach documented here where the UI may be primarily driven by services hosted on the Spring-Boot project. To get back the live reload feature on the AngularJS2 development is however a cinch.

First proxy the backend, create a proxy.conf.json file with entry which looks like this:

  "/api": {
    "target": "http://localhost:8080",
    "secure": false

and start up the Angular-cli server using the command:

ng serve --proxy-config proxy.conf.json

and start up the server part independently using:

mvn spring-boot:run

That is it, now the UI development can be carried out independent of the server side API's!. For an even greater punch just use the excellent devtools that is packaged with Spring Boot to get a live reload(more a restart) feature on the server side also.


This is the recipe I use for any basic UI that I may have to create, this approach probably is not ideal for large projects but should be a perfect fit for small internal projects. I have a sample starter with a backend call hooked up available in my github repo here.

Sunday, November 13, 2016

Spring Kafka Producer/Consumer sample

My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.

Sample scenario

The sample scenario is a simple one, I have a system which produces a message and another which processes it

Implementation using Raw Kafka Producer/Consumer API's

To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my github repo here.


The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.

Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();


On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:

KafkaConsumer<String, WorkUnit> consumer  
    = new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());

Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:


try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

} finally {

Implementation using Spring Kafka 

I have the implementation using Spring-kafka available in my github repo.


Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:

public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());

public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;

One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.

And using KafkaTemplate to send a message:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();

LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);


The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:

public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    return factory;

public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());

and the service which responds to messages read by the container:

public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);

Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.


I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it.

If you are interested in exploring further, the raw producer consumer sample is available here and the Spring Kafka one here

Saturday, November 5, 2016

Parallelizing Hystrix calls

This is more common sense than anything else. If you make calls to multiple remote systems and aggregate the results in some way, represented as a marble diagram here:

And you protect each of the remote calls using the awesome Hystrix libraries, then the best way to aggregate the results is using native rx-java operators.

So consider a Hystrix command, assume that such a command in reality would wrap around a remote call:

public class SampleRemoteCallCommand1 extends HystrixCommand<String> {

    public SampleRemoteCallCommand1() {

    protected String run() throws Exception {
        return "something";

    protected String getFallback() {
        return "error";

a service which would aggregate responses from multiple such remote calls together would look like this:

SampleRemoteCallCommand1 command1 = new SampleRemoteCallCommand1();
SampleRemoteCallCommand2 command2 = new SampleRemoteCallCommand2();

Observable<String> result1Obs = command1.toObservable();
Observable<Integer> result2Obs = command2.toObservable();

Observable<String> result =
        Observable.zip(result1Obs, result2Obs, (result1, result2) -> result1 + result2);

Essentially instead of synchronously executing the Hystrix command, we just use the "toObservable()" method to return a Rx-Java Observable representation of the result and use the different ways that Observable provides to aggregate the results together, in this specific instance the zip operator.

The main advantage of this approach is that we re-use the hystrix threadpool that each command uses to run the tasks in parallel. Here is a sample project which demonstrates this - https://github.com/bijukunjummen/sample-hystrix-parallel

Just a note of caution - if your Hystrix command does not have a fallback and if you use this approach with one of the remote calls failing, you may see a memory leak in your app - I had opened an issue regarding this leak, which the excellent Netflix Team has already addressed.