Listening to Kafka Topics
This tutorial explains how to use Kafka in your Spring application.
Dependencies
Add the following dependency to your pom.xml:
<!-- Apache Kafka Spring Integration --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>[use latest version]</version> </dependency> <!-- Apache Kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>[use latest version]</version> </dependency> <!-- Spring Messaging --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>[use latest version]</version> </dependency>
This should add all the libraries you need to communicate with Kafka to your project.
Configure Kafka
Next, the connection to Kafka needs to be configured. We will use Java configuration for that. Add a new class to your project and annotate it with:
@Configuration // this will tell Spring that this is a configuration class @EnableKafka // this will tell the Spring-Kafka integration that you want to talk to Kafka
First, there are some values that you need to configure in order to consume messages from Kafka. Let's write a method that creates those configurations:
public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "your.app.client.id"); // consumer groups allow a pool of processes to divide the work of // consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "the.group.your.app.belongs.to"); return props; }
Next, we need a factory to create the consumer classes. Add the following method.
public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
Note how it calls our configuration method.
The consumer factory will be used by a listener factory. According to the javadoc, "the KafkaListenerContainerFactory
is responsible to create the listener container for a particular endpoint." Add the following code:
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
Make sure to annotate this method as @Bean
, so that Spring can automatically find it when creating listener containers.
Last but not least, add the following method to register a new listener:
@Bean public KafkaTopicListener receiver() { return new KafkaTopicListener(); }
The KafkaTopicListener
is a class we will create in the next step. So, for now this should give you an error.
Writing a listener
Let's assume there is one Kafka topic we'd like to listen to with our apps. We will write the code to do that in our KafkaTopicListener
class. Add a new class called to KafkaTopicListener
your project.
Then add a method to receive message (the name of this method doesn't matter):
@KafkaListener(id="test.listener.id", topics = "my_topic") public void receiveMessage(String message) { logger.info("Received topic: " + message); }
Note the @KafkaListener
annotation. It tells Spring that we want to listen to topic "my_topic" and that the id of our listener is "test.listener.id". Our method will then simply print any message it gets.
That's it!
That's it, just start your webapp. If your app is correctly registered with Kafka, you should see a message in the Kafka log telling you something like this:
Assignment received from leader for group [your.group.id] for generation [x].
To test if your listener works, you can use the console consumer program that ships with Kafka to send messages to your topic. If everything is set up correctly, they should be printed in your app log.
If you don't see anything printed in your log, make sure your logger is set up correctly.