MyKafkaListener.java
package com.ambrosiaandrade.pets.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
@Profile("dev")
@Component
public class MyKafkaListener {
private final List<String> messages = new CopyOnWriteArrayList<>();
@KafkaListener(
topics = "${app.kafka.topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void consume(String message) {
try {
if (message.contains("error")) {
throw new RuntimeException("Failed processing message");
}
processMessage(message);
log.info("[Kafka_consume] Consumed: {}", message);
} catch (Exception e) {
log.error("[Kafka_consume] Failed to process message: {}", message, e);
}
}
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 2000),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(
topics = "${app.kafka.topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void consumeWithRetry(String message) {
try {
if (message.contains("retry")) {
throw new RuntimeException("Temporary failure");
}
processMessage(message);
log.info("[Kafka_consumeWithRetry] Consumed: {}", message);
} catch (Exception e) {
log.error("[Kafka_consumeWithRetry] Failed to process message: {}", message, e);
}
}
private void processMessage(String message) {
messages.add(message);
}
public List<String> getMessages() {
return messages;
}
}