Apache – Kafka를 통한 이벤트 브로커 시스템 구축 및 활용

Apache Kafka란?

분산 스트리밍 플랫폼으로, 주로 대규모의 실시간 데이터 스트림을 처리하는 데 사용됩니다. Kafka는 메시지 큐 시스템, 로그 처리 시스템, 그리고 실시간 데이터 파이프라인 등 다양한 용도로 사용될 수 있음

LinkedIn에서 개발되었고, 이후, Apache Software Foundation에 의해 관리되고 있음

Apache Kafka의 주요 특징

분산 시스템: Kafka는 분산형 아키텍처를 기반으로 하며, 수많은 프로듀서(producer), 소비자(consumer), 그리고 브로커(broker)들이 협력하여 데이터를 처리함. 이 분산 구조 덕분에 높은 확장성과 내결함성(fault tolerance)을 제공함

  1. 고속 데이터 처리: Kafka는 매우 빠른 데이터 전송 속도를 자랑하며, 초당 수백만 개의 메시지를 처리할 수 있음. 이는 대규모의 실시간 데이터 처리 시스템에서 중요한 성능 요소임.
  2. 내결함성(Fault Tolerance): Kafka는 데이터의 복제(replication) 기능을 통해 내결함성을 제공함. 각 데이터는 복제본을 여러 노드에 저장할 수 있어, 일부 노드가 장애를 일으켜도 데이터 손실 없이 지속적으로 서비스할 수 있음.
  3. 내구성(Persistence): Kafka는 메시지를 디스크에 저장하여 내구성을 제공. 메시지는 기본적으로 로그 파일 형태로 저장되며, 디스크에 기록된 메시지는 일정 기간 동안 보존됨.
  4. 고가용성(High Availability): Kafka 클러스터는 여러 브로커 노드로 구성될 수 있으며, 각 메시지는 여러 복제본으로 저장되어 장애가 발생해도 서비스가 중단되지 않도록 설계되었음
  5. 실시간 데이터 스트리밍: Kafka는 데이터 스트림을 실시간으로 처리하고 전달할 수 있기 때문에 실시간 데이터 분석, 이벤트 소싱, 모니터링 시스템 등에서 널리 사용됨.

 

Kafka의 구성 요소

  1. Producer (생산자): Kafka에 데이터를 전송하는 애플리케이션이나 시스템을 말합니다. Producer는 데이터를 Kafka의 특정 토픽(Topic)에 전송.
  2. Consumer (소비자): Kafka에서 데이터를 가져오는 애플리케이션을 말함. Consumer는 Kafka의 토픽을 구독하여 데이터를 소비하고 처리함.
  3. Broker (브로커): Kafka 클러스터 내에서 데이터를 저장하고 처리하는 서버를 말함. 하나의 Kafka 클러스터는 여러 개의 브로커로 구성되며, 각 브로커는 데이터를 토픽 단위로 저장함
  4. Topic (토픽): Kafka에서 메시지를 구분하는 카테고리나 채널. Producer는 메시지를 특정 토픽에 보내며, Consumer는 특정 토픽을 구독하여 메시지를 받음.
  5. Partition (파티션): 각 토픽은 여러 개의 파티션으로 나누어질 수 있음. 파티션은 데이터의 병렬 처리를 가능하게 하여 Kafka의 확장성을 제공함. 각 파티션은 별도의 로그 파일로 저장되며, 파티션을 나누는 것은 데이터의 병렬 처리와 로드 밸런싱을 위해 필요함.
  6. Zookeeper: Kafka는 Zookeeper를 사용하여 클러스터의 메타데이터 관리와 브로커 간의 협동 작업을 관리함. Zookeeper는 클러스터의 브로커 상태와 리더 선출 등을 담당함. 그러나 최근에는 Kafka의 자체 메타데이터 관리로 Zookeeper 의존성을 줄여나가는 방향으로 발전하고 있음.

 

Kafka의 주요 용도

  1. 실시간 데이터 파이프라인: Kafka는 실시간 데이터 스트리밍 시스템을 구축하는 데 매우 유용함. 예를 들어, 웹 애플리케이션의 로그 데이터를 실시간으로 수집하고 분석하는 데 사용될 수 있음.
  2. 이벤트 소싱(Event Sourcing): Kafka는 이벤트 기반 아키텍처에서 이벤트를 처리하는 데 매우 적합함. 이벤트 소싱은 애플리케이션 상태를 이벤트 로그로 저장하고 이를 통해 시스템 상태를 추적하는 방식힘.
  3. 로그 수집 및 처리: Kafka는 대규모의 로그 데이터를 실시간으로 수집하고 처리하는 데 효과적임. 여러 서버에서 생성된 로그 데이터를 Kafka에 전달하고, 이를 실시간으로 처리하여 모니터링, 경고 시스템 등을 구축할 수 있음.
  4. 실시간 분석 및 대시보드: Kafka를 사용하여 실시간 데이터 분석 시스템을 구축할 수 있음. 예를 들어, 웹사이트 트래픽, 사용자 행동 데이터, IoT 센서 데이터를 실시간으로 스트리밍하여 분석할 수 있음.
  5. Microservices 통합: Kafka는 마이크로서비스 아키텍처에서 서비스 간의 통합을 위한 메시징 시스템으로 사용됨. Kafka를 통해 각 서비스가 이벤트 기반으로 통신하고 데이터를 교환할 수 있음.
  6. 데이터 스트리밍: Kafka는 실시간으로 데이터 스트리밍을 처리하는 데 적합함. 예를 들어, 금융 거래 시스템이나 사용자 인터랙션 데이터를 실시간으로 처리하고 다른 시스템에 전달할 수 있음.

 

Kafka의 장점

  1. 높은 성능: Kafka는 초당 수백만 건의 메시지를 처리할 수 있을 정도로 빠른 성능을 제공함.
  2. 확장성: Kafka는 클러스터로 확장할 수 있어, 사용량이 증가하면 클러스터의 크기를 확장하여 성능을 유지할 수 있음.
  3. 내구성 및 신뢰성: Kafka는 데이터를 디스크에 기록하고 복제하여 내구성과 신뢰성을 보장함.
  4. 실시간 처리: 실시간 스트리밍 처리에 적합하여 빠르게 처리해야 하는 데이터에 유용함.
  5. 유연한 소비자 모델: 여러 소비자가 동시에 하나의 토픽을 구독할 수 있어, 다양한 애플리케이션과 서비스를 통합할 수 있음.

Kafka의 단점

  1. 운영 복잡성: Kafka 클러스터를 설정하고 관리하는 것이 다소 복잡할 수 있음. 특히 Zookeeper와의 통합과 클러스터 크기가 커질수록 관리가 어려워질 수 있음.
  2. 메시지 지연: 높은 부하 상황에서는 메시지 전송 지연이 발생할 수 있음.
  3. 자원 소모: Kafka는 메모리와 디스크 공간을 많이 사용하기 때문에, 클러스터 운영 시 자원 관리를 신경 써야 함.

Kafka 서버 구축

Compose 파일을 통한 구동

Docker를 통한 서비스 구축

아래 Docker Compose 파일을 이용하여 다양한 인프라 환경에 맞춰 서버를 간단하게 설치할 수 있다.

https://github.com/conduktor/kafka-stack-docker-compose

# git clone https://github.com/conduktor/kafka-stack-docker-compose.git

위 방법을 통해, Compose 파일을 다운로드 하여, 자신에 맞는 환경의 Kafka를 설치하면 된다.

아래와 같이 zk-single-kafka-single.yml 파일을 통해 단일 인스턴스 형태의 kafka를 설치한다.

# docker compose –f zk-single-kafka-single.yml up –d

위와 같이 설정하게 되면, 기본적인 설정과 같이 구동해야 할 서비스들도 같이 구동된다.

 

실행중인 Process 확인

실행중인 서비스에 대한 프로세스 확인

# docker compose –f zk-single-kafka-single.yml ps

 

Process 시작/중단

서비스를 일시 중단 하거나, 중단된 서비스를 다시 시작

# docker compose –f zk-single-kafka-single.yml start/stop

 

Process 삭제

# docker-compose –f zk-single-kafka-single.yml down

 

Docker 이미지를 통한 직접 구축

그렇지 않고, 직접 Docker Image를 내려 받아 직접 Docker 컨테이너를 구축할 수 도 있다.

다만, 이렇게 Docker 이미지를 이용하여 설치할 경우, 기본적으로 필요한 Docker 명령 도구들은 같이 설치되지 않으므로, 별도로 설치하거나, 외부에서 접속하는 방법을 따라야 한다.

기본 이미지를 통한 구동

# docker run -p 9092:9092 -d –restart=unless-stopped –name kafka apache/kafka:latest

 

GrallVM으로 빌드된 Native 이미지 구동

# docker run -p 9092:9092 -d –restart=unless-stopped –name kafka apache/kafka-native:latest

 

Command Line Tools

Kafka 명령 테스트를 위해서는 Command Line Tool 을 이용하여 수행할 수 있다.

만약 Docker Image를 통해 직접 설치했을 경우는 이미지 내부에 Command Line 명령들이 존재하지 않기 때문에, 별도로 설치해 줘야 한다.

외부에서 명령을 설치한 후, 접속 정보를 통해 접속할 수 있다.

이벤트 저장할 Topic 생성

Topic 생성

# kafka-topics.sh –create –topic [Topic 이름] –bootstrap-server [서버주소]

생성된 Topic 확인

# kafka-topic.sh –describe –topic [Topic 이름] –bootstrap-server [서버주소]

생성된 전체 Topic 리스트 확인

# kafka-topic.sh –bootstrap-server [서버주소] –list

생성된 Topic 정보 확인

# kafka-topics.sh –describe –topic [Topic 이름] –bootstrap-server [서버주소]

이벤트 송수신

이벤트 송신 (Producer)

# kafka-console-producer.sh –topic [Topic 이름] –bootstrap-server [서버주소]

이벤트 수신 (Consumer)

# kafka-console-consumer.sh –topic [Topic 이름] –bootstra-server [서버주소]

Producer의 동작 방식

Procedure는 앞서 설명한 바와 같이 이벤트를 생성하는 명령이다.

Producer는 Consumer가 이벤트를 가져갈 수 있도록 Partition에 데이터를 입력하게 된다. 이때, 파티션을 분배하는 방식은 몇가지 방식이 있다.

키 기반 파티셔닝 (Key-based Partitioning)

메시지에 키가 포함된 경우, 해당 키를 Hash한 후 그 결과값을 이용하여 파티션을 선정함. 이 방식은 동일한 키의 메시지가 항상 같은 파티션에 전송되는 것을 보장하여, 이는 파티션 내 데이터의 일관성을 유지하는데 유리함

메시지 1 (key=user123) => Hash => Partition 0
메시지 2 (key=user123) => Hash => Partition 0
메시지 3 (key=user456) => Hash => Partition 1

Round-robin 기반 파티셔닝

메시지에 키가 없을 경우, Round-robin방식으로 메시지를 순차적으로 파티션에 할당하며, 데이터의 일관성을 깨지지만, 부하 분산에 유리함

메시지 1 => Partition 0
메시지 2 => Partition 1
메시지 2 => Partition 2

Consumer의 동작 방식

Consumer는 Procedure에서 생성한 이벤트를 수신하는 명령이다.

Consumer는 Consumer그룹으로 묶일 수 있다. (특별히 묶이지 않은 Consumer는 단일 Consumer를 가진 Group으로 보면 된다.) 이렇게 묶인 Consumer Group은 대상 Topic의 Partition을 읽어오도록 Consumer에서 요청한다.

순차 처리 방식인 Kafka의 특징은 이 Partition에 의존된다. Partition이 1개라면, 1개의 순차 처리 방식의 명령을 수행하며, N개 라면 N개의 병렬성을 가진 명령 파이프라인이 생성된다고 보면 된다.

Partition과 Consumer의 개수가 동일할 경우

Partition의 개수와 Consumer의 개수가 동일할 경우, Consumer들은 할당된 하나의 Partition만을 처리하게 된다.

Partition 01 => Consumer 01
Partition 02 => Consumer 02
Partition 03 => Consumer 03

Partition이 Consumer 개수보다 많을 경우 (Partition > Consumer)

하나의 Consumer는 1개 이상의 Partition을 할당 받아 처리하게 된다.

Partition 01, 02 => Consumer 01
Partition 03, 04 => Consumer 02
Partition 05 => Consumer 03

Partition 이 Consumer 개수보다 적을 경우, (Partition < Consumer)

Partition을 할당 받지 못한 Consumer은 역할을 수행하지 못하고 대기하게 된다.

Partition 01 => Consumer 01
Partition 02 => Consumer 02
Partition 03 => Consumer 03
대기 => Consumer 04
대기 => Consumer 05

파티션을 나눠서 Topic 생성 방법

파티션을 나눠서 토픽을 생성하는 방법은 다음과 같다.

# kafka-topics.sh –create –bootstrap-server [서버 아이피] –topic [Topic 이름] –partitions [파티션 개수]

복제본 설정

Kafka는 복제본에 대한 설정은 Topic기준으로 설정을 수행해야 한다. 만약 설정하지 않는다면, 기본 복제본은 1개만을 생성하여 브로커 장애 발생 시 데이터 손실이 발생할 수 있다.

복제본은 다음과 같이 설정하여 지정할 수 있으며, 지정된 개수 만큼 적절한 브로커에 자동으로 배치가 된다.

# kafka-topics.sh –create –bootstrap-server [서버주소] –topic [Topic 이름] –partition [파티션 개수] –replication-factor [생성할 복제본 수]

이벤트 수신 방법에 대한 정의

이벤트는 수신 방법 정의에 따라 다른 형태로 수신을 받을 수 있다. 이를테면, 메시지 수신을 Consumer가 수동으로 Commit을 할 때까지 삭제를 지연 시키면, Consumer가 종료되어 수신할 수 없는 상태가 되더라도, 다시 정상화 되어 데이터를 수신 받을 때까지 삭제를 지연시킬 수 있다.

다만 이때도, 서버에 설정된 메시지 보관 시간 까지만 보관되며, 이 시간이 지날 때까지 수신하지 못하면, 삭제되므로, 주의해야 한다.

수신 Timeout 설정

수신 Timeout을 Kafka의 retention.ms을 통해 설정할 수 있다. 이는 토픽을 생성할 때, 인자 값으로 설정할 수 있다.

kafka-topics.sh –create –topic [TOPIC 이름] –bootstrap-server [서버 주소] –config retention.ms= 86400000

Spring boot 상의 구현

의존성 추가

Spring boot에서는 spring.kafka 프로젝트를 통해 kafka를 지원하고 있다. 아래와 같이 build.gradle파일에 의존성을 추가한다.

dependency {
implementation ‘org.springframework.kafka:spring-kafka’
}

사용할 서버와 Topic 설정

YAML 파일을 통한 설정

 

spring:
kafka:
bootstrap-servers: “localhost:9092”
consumer:
group-id: “myGroup”
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

KafkaTemplate을 통한 이벤트 송수신

Kafka 빈 생성

프로젝트를 통해 자동으로 생성된 KafkaTemplate을 아래와 같은 코드를 통해 주입 받아 사용 상태를 설정할 수 있다.

@Component
public class KafkaBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

……
}

송신 메서드 생성

생성된 Bean에서, KafkaTemplate을 통해 송신 설정을 수행한다.

@Component
public class KafkaBean {
public void sendMessage(String message) {
this.kafkaTemplate.send(TOPIC, message);
}

}

수신 메서드 생성

생성된 Bean에서, KafkaTemplate을 통해 수신 설정을 수행한다.

@Component
public class KafkaBean {
@KafkaListener(topics = “someTopic”)
public void processMessage(String content) {
// ……
}

}

특이 조건

기본 동작

기본적으로 Group으로 그룹화 되지 않은 Consumer들은, 자신이 구독하고 있는 TOPIC에 대해서는 모두 한번씩 데이터를 수신하게 된다. 이는 Group 단위로 Offset을 관리하고 있기 때문이다.

 

서버 중 한대만 받도록 설정

Consumer의 Group을 통해 설정할 수 있다. 같은 Group 이름으로 설정된 Consumer들은 해당 그룹에서 대표자 1대에서는 수신이 가능한 상태가 된다.

Kafka-console-consumer.sh

# kafka-console-consumer.sh –topic [TOPIC 이름] –bootstrap-server [서버 주소] –group [그룹 네임]

Spring boot

@KafkaListener(topics = “[Topic]”, groupId = “[Group ID]”)

 

수신 중에, 또 다른 Group 생성 시

Group은 개별 단위로 Offset을 관리하고 있다고 설명했다. 그렇기에 현재까지 발생한 모든 이벤트는, 해당 그룹에서는 새로운 이벤트로 간주된다.

다만, auto-offset-reset 설정에 의해, 새로 수신 받을 메시지의 상태를 지정할 수 있다.

  1. earliest 로 설정했다면, 0번부터 모든 데이터를 재 수신하기를 시도할 것이다.
  2. latest로 설정한다면, 현재 마지막 offset으로 초기화 되며, 기존에 수신된 모든 이벤트 들은 무시되게 된다.

Consumer Group을 설정할 때는 위와 같은 규칙을 이해하고, 적절한 옵션을 설정해야 할 필요가 있다.

 

분배 방식

이벤트 분배 방식은 Partition과 Consumer의 개수에 밀접한 관계를 가지고 있다.

Producer는 기본적으로 Partition 들에게 보내는 이벤트의 Key와 Partition Number을 기초로 설정되어 있는 Partitioner 동작에 맞춰 분배하게 된다.

  1. key, Partition Number 모두 입력되지 않았을 경우.
    설정되어 있는 Partitioner의 기본 동작에 맞춰 분배된다. 기본적으로 Round robin 방식으로 분배가 된다. (아무런 설정이 되어 있지 않을 경우, 여기서 설명한 DefaultPartitioner가 설정되지 않는 것 같다. 이 문제는 뒤에서 설명한다.)
  2. Key가 설정되어 있을 경우
    추가 설정이 되어 있지 않을 경우, Key를 해시 하여 생성된 키 값에 해당하는 파티션에 분배하게 된다. 이는 특정 키가 한 파티션으로 보내어 데이터 일관성을 유지하기 위함이다.
  3. Partition Number가 설정되어 있을 경우
    지정된 파티션 넘버에 요청을 기록한다. 이때, 파티션 넘버가 유효하지 않을 경우, 이벤트 기록 자체가 실패해 버리기 때문에 주의가 필요하다.
  4. Key, Partition Number가 모두 설정되어 있을 경우
    이럴 경우, 우선순위에 따라 Partition Number가 먼저 적용된다.

 

기본 Default Partitioner

지금까지 설명한 바와 같이 아무런 설정이 없고, Key, Partition Number 없이 이벤트를 보낼 경우, Default Partitioner의 동작에 의해 Round robin방식으로 모든 Partition에 순차적으로 메시지를 보내야 한다. 하지만, 직접 해보면 그렇게 동작하지 않는 것을 확인할 수 있다. (버전에 따라 정상 동작 할 수도…)

이는, 설정 상에 기본 Partitioner가 DefaultPartitioner로 설정되지 않는 문제인 것 같다. (확인해봐야 할 문제)

아래 설정으로 DefaultPartitioner가 지정되도록 수정한다.

application.yml

spring.kafka.producer.properties.partitioner.class: org.apache.kafka.clients.producer.internals.DefaultParitioner

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다