/
Listening to Kafka Topics

Listening to Kafka Topics

This tutorial explains how to use Kafka in your Spring application.

Dependencies

Add the following dependency to your pom.xml:

<!-- Apache Kafka Spring Integration -->
<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     <version>[use latest version]</version>
</dependency>


<!-- Apache Kafka -->
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>[use latest version]</version>
</dependency>

<!-- Spring Messaging -->
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-messaging</artifactId>
	<version>[use latest version]</version>
</dependency>

This should add all the libraries you need to communicate with Kafka to your project.

Configure Kafka

Next, the connection to Kafka needs to be configured. We will use Java configuration for that. Add a new class to your project and annotate it with:

@Configuration // this will tell Spring that this is a configuration class
@EnableKafka  // this will tell the Spring-Kafka integration that you want to talk to Kafka

First, there are some values that you need to configure in order to consume messages from Kafka. Let's write a method that creates those configurations:

public Map<String, Object> consumerConfigs() {
	Map<String, Object> props = new HashMap<>();
	// list of host:port pairs used for establishing the initial connections
	// to the Kakfa cluster
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			"localhost:9092");
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			IntegerDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class);
	props.put(ConsumerConfig.CLIENT_ID_CONFIG, "your.app.client.id");
	
	// consumer groups allow a pool of processes to divide the work of
	// consuming and processing records
	props.put(ConsumerConfig.GROUP_ID_CONFIG, "the.group.your.app.belongs.to");

	return props;
}

Next, we need a factory to create the consumer classes. Add the following method.

public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

Note how it calls our configuration method.

The consumer factory will be used by a listener factory. According to the javadoc, "the KafkaListenerContainerFactory is responsible to create the listener container for a particular endpoint." Add the following code:


@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

Make sure to annotate this method as @Bean, so that Spring can automatically find it when creating listener containers.

Last but not least, add the following method to register a new listener:


@Bean
public KafkaTopicListener receiver() {
    return new KafkaTopicListener();
}

The KafkaTopicListener is a class we will create in the next step. So, for now this should give you an error.

Writing a listener

Let's assume there is one Kafka topic we'd like to listen to with our apps. We will write the code to do that in our KafkaTopicListener class. Add a new class called to KafkaTopicListener your project.

Then add a method to receive message (the name of this method doesn't matter):

@KafkaListener(id="test.listener.id", topics = "my_topic")
public void receiveMessage(String message) {
	logger.info("Received topic: " + message);
}

Note the @KafkaListener annotation. It tells Spring that we want to listen to topic "my_topic" and that the id of our listener is "test.listener.id". Our method will then simply print any message it gets.

That's it!

That's it, just start your webapp. If your app is correctly registered with Kafka, you should see a message in the Kafka log telling you something like this:

Assignment received from leader for group [your.group.id] for generation [x].

To test if your listener works, you can use the console consumer program that ships with Kafka to send messages to your topic. If everything is set up correctly, they should be printed in your app log.

If you don't see anything printed in your log, make sure your logger is set up correctly.



Related content