Sunday, November 13, 2016

Spring Kafka Producer/Consumer sample

My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API's that is easy to use and is familiar to someone with a Spring background.

Sample scenario

The sample scenario is a simple one, I have a system which produces a message and another which processes it

Implementation using Raw Kafka Producer/Consumer API's

To start with I have used raw Kafka Producer and Consumer API's to implement this scenario. If you would rather look at the code, I have it available in my github repo here.


The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.

Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();


On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:

KafkaConsumer<String, WorkUnit> consumer  
    = new KafkaConsumer<>(props, stringKeyDeserializer(), workUnitJsonValueDeserializer());

Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:


try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
  "consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

} finally {

Implementation using Spring Kafka 

I have the implementation using Spring-kafka available in my github repo.


Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:

public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());

public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;

One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.

And using KafkaTemplate to send a message:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();"topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);


The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:

public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    return factory;

public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());

and the service which responds to messages read by the container:

public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {"Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);

Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.


I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API's and show how Spring-Kafka wrapper simplifies it.

If you are interested in exploring further, the raw producer consumer sample is available here and the Spring Kafka one here


  1. Learning is an art! It depends on us how we learn and gain, goof informative blog, connecting with it, no doubt you get your problems solved.

  2. I've been trying to get a similar setup to this to work on my system but for the life of me, it won't work. the producer times out and the consumer never sees the message. I've got spring-kafka 1.1.3.release and docker-kafka

    2017-03-29 11:16:29.913 ERROR [-,,,] 8367 --- [ad | producer-1] : Exception thrown when sending a message with key='58' and payload='{"callback":"","memberId":"12345678A","meta-data":{"pts-lcp-cid":...' to topic memberValidation:

    org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for memberValidation-0 due to 30096 ms has passed since batch creation plus linger time

  3. Appreciate this post. Let me try it out.