KafkaController.java
package com.ambrosiaandrade.pets.controller;
import com.ambrosiaandrade.pets.listener.MyKafkaListener;
import com.ambrosiaandrade.pets.service.KafkaConsumerService;
import com.ambrosiaandrade.pets.service.KafkaProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Tag(name = "5. Kafka Operations", description = "Operations related to Kafka messaging, including producing and consuming messages.")
@Slf4j
@Profile("dev")
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private KafkaProducerService kafkaProducerService;
private KafkaConsumerService kafkaConsumerService;
private MyKafkaListener listener;
private static final String STATUS = "status";
private static final String MESSAGE = "message";
public KafkaController(KafkaProducerService kafkaProducerService, KafkaConsumerService kafkaConsumerService, MyKafkaListener listener) {
this.kafkaProducerService = kafkaProducerService;
this.kafkaConsumerService = kafkaConsumerService;
this.listener = listener;
}
@Operation(
summary = "Produce a message to Kafka",
description = "Sends a message to a Kafka topic."
)
@Parameter(
name = "message",
description = "The message to send to Kafka. Default is 'Hello kafka'.",
required = false,
example = "Hello kafka"
)
@ApiResponses(value = {
@ApiResponse(
responseCode = "200",
description = "Message sent successfully",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = String.class))
),
@ApiResponse(
responseCode = "500",
description = "Internal server error",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = String.class))
)
})
@GetMapping("/producer")
public ResponseEntity<Map<String, String>> kafkaProduce(@RequestParam(defaultValue = "Hello kafka") String message, @RequestParam(required = false, defaultValue = "false") boolean retry) {
kafkaProducerService.send(message, retry);
Map<String, String> response = new HashMap<>();
response.put(STATUS, "Message sent");
response.put(MESSAGE, message);
return ResponseEntity.ok(response);
}
@Operation(
summary = "Retrieves all messages from Kafka",
description = "Retrieves all the messages from a Kafka topic, from the beginning."
)
@ApiResponses(value = {
@ApiResponse(
responseCode = "200",
description = "Messages consumed successfully",
content = @Content(
mediaType = "application/json",
array = @ArraySchema(schema = @Schema(implementation = String.class))
)
),
@ApiResponse(
responseCode = "500",
description = "Internal server error",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = String.class))
)
})
@GetMapping("/consumer")
public ResponseEntity<Map<String, String>> kafkaConsume() {
Map<String, String> response = new HashMap<>();
response.put(STATUS, "Message consumed");
response.put(MESSAGE, listener.getMessages().toString());
return ResponseEntity.ok(response);
}
@Operation(
summary = "Consume messages from Kafka",
description = "Retrieves all the messages from a Kafka topic."
)
@ApiResponses(value = {
@ApiResponse(
responseCode = "200",
description = "Messages consumed successfully",
content = @Content(
mediaType = "application/json",
array = @ArraySchema(schema = @Schema(implementation = String.class))
)
),
@ApiResponse(
responseCode = "500",
description = "Internal server error",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = String.class))
)
})
@GetMapping("/consumer/{number}")
public ResponseEntity<Map<String, String>> kafkaConsume(@PathVariable int number) {
Map<String, String> response = new HashMap<>();
response.put(STATUS, "Message consumed");
response.put(MESSAGE, kafkaConsumerService.fetchMessagesFromKafka(number).toString());
return ResponseEntity.ok(response);
}
}