본문 바로가기
언어 & 라이브러리/카프카

스프링에서 카프카 시작하기

by illlilillil 2022. 2. 9.

진행중인 프로젝트에 적용하기 전에 간단하게 카프카로 보내는 프로젝트를 만들어 보겠습니다.

 

1. 프로젝트 생성

start.spring.io에서 gradle, spring boot 2.5.9 버전

dependency에 spring web과 kafka, lombok, websocket을 추가해주세요.

 

2. 설정 값 생성

클래스 생성하셔서 아래 값을 기입해주세요. 토픽과 그룹 아이디, 주소를 주입해줄거에요.

public class KafkaConstants {
    public static final String KAFKA_TOPIC = "kafka";
    public static final String GROUP_ID = "test";
    public static final String KAFKA_BROKER = "localhost:9092";
}

 

3. 엔티티 생성

implements로 직렬화가 가능하도록 Serializable을 추가해주세요.

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
    private String author;
    private String content;
    private String timestamp;
}

 

4. 카프카 설정

카프카는 프로듀서와 컨슈머가 있습니다. 

각각 설정 파일을 설정해주어야 객체 형식의 데이터를 직렬화하여 저장할 수 있습니다.

 

프로듀서를 설정해주겠습니다.

@EnableKafka
@Configuration
public class ProducerConfiguration {

    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    @Bean
    public Map<String, Object> producerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configurations.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configurations;
    }

    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

컨슈머도 설정해줄게요.

@EnableKafka
@Configuration
public class ListenerConfiguration {

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations(), new StringDeserializer(), new JsonDeserializer<>(Message.class));
    }

    @Bean
    public Map<String, Object> consumerConfigurations() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        configurations.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);
        configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return configurations;
    }
}

WebSocket 설정 - 나중 Socket.io 연결을 위해서 설정해줄게요.

Endpoint로 연결 주소를 설정해줍니다. ex) localhost:8080/chat으로 connect

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/kafka");
        registry.enableSimpleBroker("/topic/");
    }
}

 

 

5. ConsumerService 작성

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaSampleConsumerService {
    private final SimpMessagingTemplate template;

    @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
    public void listen(Message message) {
        log.info("sending via kafka listener..");
        template.convertAndSend("/topic/group", message);
    }
}

 

6. Controller 작성 - send 메소드로 카프카에 메시지를 전달합니다.

@Slf4j
@CrossOrigin
@RestController
@RequestMapping(value = "/kafka")
@RequiredArgsConstructor
public class KafkaSampleProducerController {

    private final KafkaTemplate<String, Message> kafkaTemplate;

    //producer 부분
    @PostMapping
    public void sendMessage(@RequestBody Message message) {
        message.setTimestamp(LocalDateTime.now().toString());
        log.info("Produce message : " + message.toString());
        try {
            kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    //여기서 프론트엔드로 메시지를 전송합니다.
    @MessageMapping("/sendMessage")
    @SendTo("/topic/group")
    public Message broadcastGroupMessage(@Payload Message message) {
        return message;
    }
}

 

이제 모두 작성을 마치고 포스트맨으로 메시지 전송을 해보겠습니다.

 

데이터가 잘 들어갔는지 확인하기 위해 토픽 이름을 가지고 조회를 해보겠습니다.

터미널에 들어가셔서 docker exec -it kafka /bin/sh 입력해주세요.

cd /opt/kafka_2.13-2.8.1/bin 폴더로 이동한뒤에

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka --from-beginning 로 데이터 조회를 할 수 있습니다.

 

완성된 소스는 깃허브에 올라가 있습니다.

 

GitHub - dhkstnaos/kakfa-test

Contribute to dhkstnaos/kakfa-test development by creating an account on GitHub.

github.com

참고 링크

 

Spring Boot | Kafka를 이용한 채팅 (3) 메시지 주고받기 + ReactJS

1. Controller작성 ✔️kafkaTemplate.send메서드를 통해 메시지가 전송됨 @Slf4j @CrossOrigin @RestController @RequestMapping(value = "/kafka") public class ChatController { @Autowired private KafkaTemp..

gaemi606.tistory.com

 

'언어 & 라이브러리 > 카프카' 카테고리의 다른 글

카프카 명령어 정리  (0) 2022.02.12
아파치 카프카란?  (0) 2022.02.08

댓글