진행중인 프로젝트에 적용하기 전에 간단하게 카프카로 보내는 프로젝트를 만들어 보겠습니다.
1. 프로젝트 생성
start.spring.io에서 gradle, spring boot 2.5.9 버전
dependency에 spring web과 kafka, lombok, websocket을 추가해주세요.
2. 설정 값 생성
클래스 생성하셔서 아래 값을 기입해주세요. 토픽과 그룹 아이디, 주소를 주입해줄거에요.
public class KafkaConstants {
public static final String KAFKA_TOPIC = "kafka";
public static final String GROUP_ID = "test";
public static final String KAFKA_BROKER = "localhost:9092";
}
3. 엔티티 생성
implements로 직렬화가 가능하도록 Serializable을 추가해주세요.
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private String author;
private String content;
private String timestamp;
}
4. 카프카 설정
카프카는 프로듀서와 컨슈머가 있습니다.
각각 설정 파일을 설정해주어야 객체 형식의 데이터를 직렬화하여 저장할 수 있습니다.
프로듀서를 설정해주겠습니다.
@EnableKafka
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory<String, Message> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
@Bean
public Map<String, Object> producerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configurations.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return configurations;
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
컨슈머도 설정해줄게요.
@EnableKafka
@Configuration
public class ListenerConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
}
@Bean
public Map<String, Object> consumerConfigurations() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configurations;
}
}
WebSocket 설정 - 나중 Socket.io 연결을 위해서 설정해줄게요.
Endpoint로 연결 주소를 설정해줍니다. ex) localhost:8080/chat으로 connect
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/kafka");
registry.enableSimpleBroker("/topic/");
}
}
5. ConsumerService 작성
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaSampleConsumerService {
private final SimpMessagingTemplate template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void listen(Message message) {
log.info("sending via kafka listener..");
template.convertAndSend("/topic/group", message);
}
}
6. Controller 작성 - send 메소드로 카프카에 메시지를 전달합니다.
@Slf4j
@CrossOrigin
@RestController
@RequestMapping(value = "/kafka")
@RequiredArgsConstructor
public class KafkaSampleProducerController {
private final KafkaTemplate<String, Message> kafkaTemplate;
//producer 부분
@PostMapping
public void sendMessage(@RequestBody Message message) {
message.setTimestamp(LocalDateTime.now().toString());
log.info("Produce message : " + message.toString());
try {
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//여기서 프론트엔드로 메시지를 전송합니다.
@MessageMapping("/sendMessage")
@SendTo("/topic/group")
public Message broadcastGroupMessage(@Payload Message message) {
return message;
}
}
이제 모두 작성을 마치고 포스트맨으로 메시지 전송을 해보겠습니다.
데이터가 잘 들어갔는지 확인하기 위해 토픽 이름을 가지고 조회를 해보겠습니다.
터미널에 들어가셔서 docker exec -it kafka /bin/sh 입력해주세요.
cd /opt/kafka_2.13-2.8.1/bin 폴더로 이동한뒤에
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka --from-beginning 로 데이터 조회를 할 수 있습니다.
완성된 소스는 깃허브에 올라가 있습니다.
참고 링크
'언어 & 라이브러리 > 카프카' 카테고리의 다른 글
카프카 명령어 정리 (0) | 2022.02.12 |
---|---|
아파치 카프카란? (0) | 2022.02.08 |
댓글