Tuesday, November 29, 2016

Using Kafka with Junit

One of the neat features that the excellent Spring Kafka project provides, apart from a easier to use abstraction over raw Kafka Producer and Consumer, is a way to use Kafka in tests. It does this by providing an embedded version of Kafka that can be set-up and torn down very easily.

All that a project needs to include this support is the "spring-kafka-test" module, for a gradle build the following way:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.

With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

This would start up a Kafka Cluster with 2 brokers, with a topic called "messages" using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.

Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();


Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

A little more comprehensive test is available here

Tuesday, November 22, 2016

Recipe for getting started with Spring Boot and Angular 2

I am primarily a service developer who has to create some passable UI's once in a while. I was adept at basic AngularJS1 based UI's and could get stuff done by using an approach that I have outlined before. With the advent of Angular 2 I had to unfortunately throw my previous approach out of the window and now have an approach with Spring Boot/ Angular 2 that works equally well.

The approach essentially works on the fact that a Spring Boot web application looks for static content in a very specific location - src/main/resources/static folder from the root of the project, so if I can get the final js content into this folder, then I am golden.

So let us jump into it.

Pre-requisites

There is primarily one pre-requisite - the excellent angular-cli tool which is a blessing for UI ignorant developers like me.

The second optional but useful pre-requisite is the Spring-Boot CLI tool described here


Generating a SPA Project


Given these two tools, first create a Spring Boot web project either by starting from http://start.spring.io or using the following CLI command:

spring init --dependencies=web spring-boot-angular2-static-sample

At this point a starter project should have been generated in the spring-boot-angular2-static-sample folder. From that folder generate a Angular 2 project using the angular-cli.

ng init

Change the location where angular-cli builds the artifacts, edit angular-cli.json and modify as follows:




Now build the static content:

ng build

this should get the static content to the src/main/resources/static folder.

And start up the Spring-Boot app:

mvn spring-boot:run

and the AngularJS2 based UI should render cleanly!

Live Reload

One of the advantages of using the Angular-cli is the excellent tool-chain that it comes with - one of them being the ability to make changes and view it reflected on the UI. This ability is lost with the approach documented here where the UI may be primarily driven by services hosted on the Spring-Boot project. To get back the live reload feature on the AngularJS2 development is however a cinch.

First proxy the backend, create a proxy.conf.json file with entry which looks like this:

{
  "/api": {
    "target": "http://localhost:8080",
    "secure": false
  }
}

and start up the Angular-cli server using the command:

ng serve --proxy-config proxy.conf.json

and start up the server part independently using:

mvn spring-boot:run

That is it, now the UI development can be carried out independent of the server side API's!. For an even greater punch just use the excellent devtools that is packaged with Spring Boot to get a live reload(more a restart) feature on the server side also.

Conclusion

This is the recipe I use for any basic UI that I may have to create, this approach probably is not ideal for large projects but should be a perfect fit for small internal projects. I have a sample starter with a backend call hooked up available in my github repo here.

Sunday, November 13, 2016

Spring Kafka Producer/Consumer sample

My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.

Sample scenario


The sample scenario is a simple one, I have a system which produces a message and another which processes it


Implementation using Raw Kafka Producer/Consumer API's

To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my github repo here.

Producer

The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.

Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

Consumer

On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:

KafkaConsumer<String, WorkUnit> consumer  
    = new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());

Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:

consumer.subscribe("workunits);

try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

        }
    }
} finally {
    this.consumer.close();
}


Implementation using Spring Kafka 


I have the implementation using Spring-kafka available in my github repo.

Producer

Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:

@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}

@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic("workunits");
    return kafkaTemplate;
}

One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.

And using KafkaTemplate to send a message:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();

LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

Consumer

The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}



and the service which responds to messages read by the container:

@Service
public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);
    }
}

Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.


Conclusion

I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it.

If you are interested in exploring further, the raw producer consumer sample is available here and the Spring Kafka one here

Saturday, November 5, 2016

Parallelizing Hystrix calls

This is more common sense than anything else. If you make calls to multiple remote systems and aggregate the results in some way, represented as a marble diagram here:



And you protect each of the remote calls using the awesome Hystrix libraries, then the best way to aggregate the results is using native rx-java operators.

So consider a Hystrix command, assume that such a command in reality would wrap around a remote call:

public class SampleRemoteCallCommand1 extends HystrixCommand<String> {

    public SampleRemoteCallCommand1() {
        super(Setter.withGroupKey(
                HystrixCommandGroupKey.Factory.asKey("sample1"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("sample1")
                )
        );
    }

    @Override
    protected String run() throws Exception {
        DelayUtil.delay(700);
        return "something";
    }

    @Override
    protected String getFallback() {
        return "error";
    }
}


a service which would aggregate responses from multiple such remote calls together would look like this:

SampleRemoteCallCommand1 command1 = new SampleRemoteCallCommand1();
SampleRemoteCallCommand2 command2 = new SampleRemoteCallCommand2();

Observable<String> result1Obs = command1.toObservable();
Observable<Integer> result2Obs = command2.toObservable();

Observable<String> result =
        Observable.zip(result1Obs, result2Obs, (result1, result2) -> result1 + result2);
        

Essentially instead of synchronously executing the Hystrix command, we just use the "toObservable()" method to return a Rx-Java Observable representation of the result and use the different ways that Observable provides to aggregate the results together, in this specific instance the zip operator.

The main advantage of this approach is that we re-use the hystrix threadpool that each command uses to run the tasks in parallel. Here is a sample project which demonstrates this - https://github.com/bijukunjummen/sample-hystrix-parallel

Just a note of caution - if your Hystrix command does not have a fallback and if you use this approach with one of the remote calls failing, you may see a memory leak in your app - I had opened an issue regarding this leak, which the excellent Netflix Team has already addressed.

Friday, October 21, 2016

Tracing Spring Integration Flow with Spring Cloud Sleuth

Spring Cloud Sleuth is an awesome project that provides a way to trace requests that span multiple systems. Spring Cloud sleuth can optionally export this trace data to Zipkin where it can be visualized in a neat way. I especially love the fact that Spring Cloud Sleuth integrates deeply with Spring Integration and can nicely trace out the flow of a message.


Consider the following -



I have two different systems here - a work dispatcher producing "Work Unit"s and a Work Handler consuming them. They talk over a RabbitMQ broker. Just to mix the flow up a bit, I also have a retry mechanism in place which retries the message every 20 seconds in case of a processing failure



Both these systems are described using Spring Integration Java DSL, the outbound flow dispatching the WorkUnits looks like this:

@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .log()
                .handle(Amqp.outboundGateway(rabbitConfig.worksRabbitTemplate()))
                .transform(Transformers.fromJson(WorkUnitResponse.class))
                .get();
    }

    @Bean
    public IntegrationFlow handleErrors() {
        return IntegrationFlows.from("errorChannel")
                .transform((MessagingException e) -> e.getFailedMessage().getPayload())
                .transform(Transformers.fromJson(WorkUnit.class))
                .transform((WorkUnit failedWorkUnit) -> new WorkUnitResponse(failedWorkUnit.getId(), failedWorkUnit.getDefinition(), false))
                .get();
    }

}

This is eminently readable - the "Work Unit" comes through a "works channel" and is dispatched to a RabbitMQ queue after tranforming to json. Note that the dispatch is via an outbound gateway, this means that the Spring integration would put the necessary infrastructure in place to wait for a reply to be back from the remote system. In case of an error, say if the reply does not appear in time, a stock response is provided back to the user.

On the Work Handler side a similar flow handles the message:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundGateway(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count < 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .transform(Transformers.toJson())
                .get();
    }

}

The only wrinkle in this flow is the retry logic which discards the message after 3 retries. If you are interested in the details of how the retry is being hooked up, I have more details here.


So now, given this fairly involved flow, here is how Spring Cloud Sleuth with Zipkin integrated looks like:



Spring Cloud Sleuth intercepts every message channel and tags the message as it flows through the channel.


Now for something a little more interesting, if the flow were more complex with 3 retries each 20 seconds apart, again the flow is beautifully brought out by Spring Cloud Sleuth and its integration with Zipkin.


Conclusion


If you maintain a Spring Integration based flow, Spring Cloud Sleuth is an addition to the project and can trace the runtime path of a message and show it visually using the Zipkin UI. I look forward to exploring more of the nuances of this excellent project.


The sample that I have demonstrated here is available in my github repo - https://github.com/bijukunjummen/si-with-sleuth-sample

Thursday, September 29, 2016

Spring-Reactive samples - Mono and Single

This is just a little bit of a learning from my previous post where I had tried out the Spring's native support for  reactive programming.

Just to quickly recap, my objective was to develop a reactive service which takes in a request which looks like this:

{
 "id":1,
  "delay_by": 2000,
  "payload": "Hello",
  "throw_exception": false
}

and returns a response along these lines:

{
  "id": "1",
  "received": "Hello",
  "payload": "Response Message"
}

I had demonstrated this in two ways that the upcoming Spring's reactive model supports, using the Reactor-Core Flux type as a return type and using Rx-java Observable type

However the catch with these types is that the response would look something like this:

[{"id":"1","received":"Hello","payload":"From RxJavaService"}]

Essentially an array, and the reason is obvious - Flux and Observable represent zero or more asynchronous emissions and so Spring Reactive Web framework has to represent such a result as an array.

The fix to return the expected json is to essentially return a type which represents 1 value - such a type is the Mono in Reactor-Core OR a Single in Rx-java. Both these types are as capable as their multi-valued counterparts in providing functions which combine, transform their elements.

So with this change the controller signature with Mono looks like this:

@RequestMapping(path = "/handleMessageReactor", method = RequestMethod.POST)
public Mono<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessageMono(message);
}


and with Single like this:

@RequestMapping(path = "/handleMessageRxJava", method = RequestMethod.POST)
public Single<MessageAcknowledgement> handleMessage(@RequestBody Message message) {
  return this.aService.handleMessageSingle(message);
}

I have the sample code available in my github repo

Saturday, September 10, 2016

RabbitMQ retries using Spring Integration

I recently read about an approach to retry with RabbitMQ here and wanted to try a similar approach with Spring Integration, which provides an awesome set of integration abstractions.

TL;DR the problem being solved is to retry a message(in case of failures in processing) a few times with a large delay between retries(say 10 mins +). The approach makes use of the RabbitMQ support for Dead Letter Exchanges and looks something like this




The gist of the flow is :
1. A Work dispatcher creates "Work Unit"(s) and sends it to a RabbitMQ queue via an exchange.
2. The Work queue is set with a Dead Letter exchange. If the message processing fails for any reason the "Work Unit" ends up with the Work Unit Dead Letter Queue.
3. Work Unit Dead Letter queue is in-turn set with the Work Unit exchange as the Dead Letter Exchange, this way creating a cycle. Further, the expiration of messages in the dead letter queue is set to say 10 mins, this way once the message expires it will be back again in the Work unit queue.
4. To break the cycle the processing code has to stop processing once a certain count threshold is exceeded.



Implementation using Spring Integration


I have covered a straight happy path flow using Spring Integration and RabbitMQ before, here I will mostly be building on top of this code.

A good part of the set-up is the configuration of the appropriate dead letter exchanges/queues, and looks like this when expressed using Spring's Java Configuration:

@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    Exchange worksExchange() {
        return ExchangeBuilder.topicExchange("work.exchange")
                .durable()
                .build();
    }


    @Bean
    public Queue worksQueue() {
        return QueueBuilder.durable("work.queue")
                .withArgument("x-dead-letter-exchange", worksDlExchange().getName())
                .build();
    }

    @Bean
    Binding worksBinding() {
        return BindingBuilder
                .bind(worksQueue())
                .to(worksExchange()).with("#").noargs();
    }
    
    // Dead letter exchange for holding rejected work units..
    @Bean
    Exchange worksDlExchange() {
        return ExchangeBuilder
                .topicExchange("work.exchange.dl")
                .durable()
                .build();
    }

    //Queue to hold Deadletter messages from worksQueue
    @Bean
    public Queue worksDLQueue() {
        return QueueBuilder
                .durable("works.queue.dl")
                .withArgument("x-message-ttl", 20000)
                .withArgument("x-dead-letter-exchange", worksExchange().getName())
                .build();
    }

    @Bean
    Binding worksDlBinding() {
        return BindingBuilder
                .bind(worksDLQueue())
                .to(worksDlExchange()).with("#")
                .noargs();
    }
    ...
}


Note that here I have set the TTL of the Dead Letter queue to 20 seconds, this means that after 20 seconds a failed message will be back in the processing queue. Once this set-up is in place and the appropriate structures are created in RabbitMQ, the consuming part of the code looks like this, expressed using Spring Integration Java DSL:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .get();
    }

}

Most of the retry logic here is handled by the RabbitMQ infrastructure, the only change here is to break the cycle by explicitly discarding the message after a certain 2 retries. This break is expressed as a filter above, looking at the header called "x-death" that RabbitMQ adds to the message once it is sent to Dead Letter exchange. The filter is admittedly a little ugly - it can likely be expressed a little better in Java code.



One more thing to note is that the retry logic could have been expressed in-process using Spring Integration, however I wanted to investigate a flow where the retry times can be high (say 15 to 20 mins) which will not work well in-process and is also not cluster safe as I want any instances of an application to potentially handle the retry of a message.

If you want to explore further, do try the sample at my github repo - https://github.com/bijukunjummen/si-dsl-rabbit-sample

Reference:

Retry With RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq