Kafka로 채팅을 구현하기 앞서 Kafka에 대해서 공부하기 위해 Kafka를 간단하게 연동해보는 실습을 해보겠습니다.
Kafka 란?
파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼이다.
pub/sub 모델의 메시지 큐 형태로 동작하며 분산환경에 특화되어있다.
- 큐잉 모델
- 브로커(서버) 안에 메시지 큐가 존재한다.
- producer는 큐로 메시지를 보내고, consumer가 큐에서 메시지를 추출한다.
- 하나의 큐에 여러개 컨슈머가 접근할 수 있어서 병렬 처리가 가능하다.
- 하나의 메시지는 하나의 Consumer에서만 처리된다.
- pub/sub 모델
- producer를 publisher, consumer를 subscriber로 명명한다.
- publisher가 subscriber에게 직접 메시지를 보내고 받는 것이 아닌, 브로커를 통해서 전달한다.
- publisher는 누가 수신하는지 알 수 없다.
- 브로커 내의 토픽이라는 카테고리에 등록한다.
- subscriber는 특정 토픽만을 선택해서 읽을 수 있고, 같은 토픽을 구독한 subscriber는 동일한 메시지를 받는다.
Kafka의 주요 특징
각 특징에 대한 설명은 따로 포스팅을 쓰겠다.
- 높은 처리량과 낮은 지연시간
- 높은 확장성
- 고가용성
- 내구성
- 개발 편의성
- 운영 및 관리 편의성
Kafka 용어
- Producer
- kafka에서 메시지를 생성하고, 해당 메시지를 카프카 클러스터에 전송하는 애플리케이션
- 한개, 혹은 여러개의 토픽으로 메시지를 보낼 수 있다.
- Consumer
- 카프카에서 메시지를 읽는 애플리케이션
- 한개, 혹은 여러개의 토픽에서 메시지를 가져올 수 있다.
- 파티션마다 한 컨슈머만 데이터를 가져갈 수 있다.
- broker
- 카프카 애플리케이션이 설치된 서버를 의미한다.
- 브로커를 추가하여 처리량 향상이 가능하다.
- zookeeper
- 브로커 health check 등 카프카 클러스터의 메타 데이터 관리 및 클러스터의 다양한 관리 작업을 수행한다.
- topic
- 카프카는 토픽이라는 곳에 데이터를 저장한다.
- 이메일 주소로 비유할 수 있다.
- 토픽의 이름은 카프카 내에서 고유하다.
- partition
- 토픽의 병렬처리를 위해 여러개의 파티션이라는 단위로 나뉜다.
- 카프카에서는 파티셔닝을 통해 단 하나의 토픽이라도 높은 처리량을 수행할 수 있다.
- 파티션 번호는 0부터 시작한다.
- 브로커 수 = 파티션 수를 맞추는 것이 성능에 좋다.
- 서비스 중에 파티션을 추가로 늘리는 것은 가능하지만 줄이는 것은 불가능하다. (토픽 삭제를 해야하므로 신중해야한다. 모니터링하면서 조절하는 것을 권장)
- offset
- 파티션의 메시지가 저장되는 위치
- 순차적으로 증가하는 숫자 (64비트의 정수)형태이다.
프로젝트
구성
springboot 3.3.2
java 17
docker 27.0.3
gradle dependencies
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
도커로 kafka 실행 하기
docker 설치
https://docs.docker.com/desktop/install/mac-install/
저는 맥 m3를 사용하고 있으므로 Docker Desktop for Mac with Apple silicon으로 설치하였습니다.
docker-compose.yml 작성
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose.yml 실행
docker-compose -f kafka-compose.yml up
위 명령어를 실행하고 설치한 docker desktop에서 실행 중인지 확인할 수 있다.
(명령어로 확인 가능)
카프카 예제 코드 작성
KafkaConsumerConfig
@Configuration
@EnableKafka // Kafka 리스터 어노테이션 활성화, Kafka 리스너가 Spring 컨텍스트에서 작동하도록 한다.
public class KafkaConsumerConfig {
// Kafka Consumer를 생성하는 팩토리
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 오프셋을 찾을 수 없을 때 가장 최신의 메시지부터 읽기 시작하도록 설정
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 아래 두줄 코드는 토픽에 대해서 auto commit으로 100초로 간격을 설정
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100000);
return new DefaultKafkaConsumerFactory<>(config);
}
// Kafka 리스터 컨테이너 팩토리 정의, Kafka 리스너를 컨테이너화하여 실행
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
// Kafka producer를 생성하는 팩토리
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
// KafkaTemplate을 생성하여 Kafka 프로듀서를 사용하여 메시지를 보낼 수 있도록 한다.
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerController
@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaProducerController {
private final TestProducer testProducer;
@PostMapping("/sending")
public void create() {
testProducer.create();
}
}
TestProducer
@Component
@RequiredArgsConstructor
public class TestProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void create() {
kafkaTemplate.send("topic", "say hello~!!!!");
}
}
TestConsumer
@Component
public class TestConsumer {
@KafkaListener(topics = "topic", groupId = "group_1")
public void listen(String message) {
System.out.println("Received Messasge in group group_1: " + message);
}
}
테스트
저는 포트만 다르게 두개의 서버를 띄워놓고, 1개의 서버에는 위 코드 대로 groupId를 group_1로 했고, 다른 서버에는 group_2로 설정해서 테스트했습니다.
localhost 8080서버
localhost 8081 서버
추가로 8080 서버만 가동시키고 8081은 가동시키지 않은 상태에서 메시지를 보냈을 때
다시 8081서버를 가동시키면 보내지 못했던 메시지를 받게 된다. (Consumer Config에서 auto offset reset config 부분)
이는 Kafka의 메시지 영속성 때문인 것으로 보인다. 메시지를 브로커의 디스크에 영구적으로 저장하여 메시지가 토픽에 저장되면 해당 메시지는 구성된 보존 기간 동안 삭제되지 않고 저장한다. (위 프로젝트에서는 보존기간은 따로 설정하진 않았다.)
이로 인해 컨슈머가 메시지를 즉시 처리하지 못하더라도, 나중에 다시 읽을 수 있다.
(혹여나 잘못된 정보가 있거나 누락된 부분이 있다면 댓글달아주시면 감사하겠습니다.)
참고
도서 : 실전 카프카 개발부터 운영까지
https://ssnotebook.tistory.com/entry/Kafka-Kafka%EA%B0%80-%EB%B9%A0%EB%A5%B8-%EC%9D%B4%EC%9C%A0
https://deep-jin.tistory.com/entry/apache-kafka
https://dkswnkk.tistory.com/705
'Back-End' 카테고리의 다른 글
REST API URI 생성 규칙 (0) | 2024.07.26 |
---|---|
멀티 모듈은 뭘까? (0) | 2024.04.22 |
HTTP 상태 코드에 대해서 (1) | 2024.04.12 |
[Spring Boot] 배포 방법 (JAR vs WAR) (0) | 2024.03.21 |
Gradle 의존성 옵션 종류 (0) | 2024.03.20 |