Kafka

Apache Kafka는 LinkedIn에서 개발되어 나중에 Apache Software Foundation에 기부된 분산 이벤트 스트리밍 플랫폼입니다. Kafka는 높은 처리량, 낮은 지연 시간, 내결함성을 갖춘 실시간 데이터 파이프라인과 스트리밍 애플리케이션을 구축하기 위한 기반을 제공한다.

주요 특징

아키텍처 개요

Kafka의 아키텍처는 다음과 같은 주요 구성 요소로 이루어져 있다:

  1. 브로커(Broker): Kafka 서버로, 토픽의 파티션을 저장하고 관리한다.
  2. 주키퍼(ZooKeeper): 브로커 클러스터의 메타데이터를 관리한다(최신 버전에서는 KRaft 모드로 대체 가능).
  3. 프로듀서(Producer): 메시지를 토픽에 발행한다.
  4. 컨슈머(Consumer): 토픽에서 메시지를 구독하고 처리한다.
  5. 토픽(Topic): 메시지가 발행되는 카테고리 또는 피드 이름이다.
  6. 파티션(Partition): 토픽은 여러 파티션으로 나뉘며, 각 파티션은 순서가 보장된 메시지 시퀀스이다.

Kafka의 핵심 개념

  1. 토픽과 파티션
    1. 토픽(Topic) 은 Kafka에서 데이터를 구성하는 가장 기본적인 단위이다. 각 토픽은 하나 이상의 파티션으로 나뉜다.
    2. 파티션(Partition) 은 토픽을 여러 서버에 분산시키는 메커니즘을 제공한다. 각 파티션은 변경 불가능한 메시지 시퀀스로, 각 메시지는 파티션 내에서 고유한 오프셋(offset)을 가진다.
  2. 복제(Replication)
    Kafka는 내결함성을 위해 파티션을 여러 브로커에 복제한다. 각 파티션에는 리더(leader)와 팔로워(follower)가 있다:
    • 리더: 모든 읽기 및 쓰기 요청을 처리한다.
    • 팔로워: 리더의 데이터를 복제하고, 리더가 실패할 경우 새로운 리더가 될 수 있다.
  3. 프로듀서와 컨슈머
    1. **프로듀서(Producer)**는 메시지를 토픽에 게시한다.
      주요 특징:
      • 메시지 키(key)를 기반으로 특정 파티션에 메시지 할당
      • 비동기 또는 동기 전송 지원
      • 배치 처리를 통한 성능 최적화
    2. **컨슈머(Consumer)**는 토픽에서 메시지를 소비합니다.
      주요 특징:
      • 컨슈머 그룹을 통한 병렬 처리
      • 오프셋 관리를 통한 메시지 위치 추적
      • 최소 한 번, 최대 한 번, 정확히 한 번 전달 의미 구현 가능
  4. 컨슈머 그룹과 오프셋
    1. **컨슈머 그룹(Consumer Group)**은 동일한 토픽을 소비하는 컨슈머 집합이다. 각 파티션은 그룹 내 하나의 컨슈머에만 할당된다.
    2. **오프셋(Offset)**은 파티션 내 각 메시지의 위치를 나타내는 순차적인 ID이다. 컨슈머는 처리한 메시지의 오프셋을 추적하여 메시지를 중복 처리하지 않도록 한다.

Kafka 설치 및 기본 구성

단일 노드 설치

개발 환경에서는 단일 노드로 Kafka를 실행할 수 있다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Kafka 다운로드
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0

# ZooKeeper 시작
bin/zookeeper-server-start.sh config/zookeeper.properties &

# Kafka 브로커 시작
bin/kafka-server-start.sh config/server.properties &

클러스터 설정

프로덕션 환경에서는 여러 브로커로 구성된 클러스터를 설정해야 한다.

각 브로커에 대한 server.properties 파일을 수정합니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 첫 번째 브로커
broker.id=0
listeners=PLAINTEXT://server1:9092
log.dirs=/var/lib/kafka/data

# 두 번째 브로커
broker.id=1
listeners=PLAINTEXT://server2:9092
log.dirs=/var/lib/kafka/data

# 세 번째 브로커
broker.id=2
listeners=PLAINTEXT://server3:9092
log.dirs=/var/lib/kafka/data

Docker 및 Kubernetes 배포

Docker Compose를 사용한 Kafka 배포 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

주요 구성 매개변수

중요한 구성 매개변수들:

프로듀서와 컨슈머 프로그래밍 기초

Java 클라이언트

Maven 의존성 추가:

1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

간단한 프로듀서 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // 프로듀서 속성 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 프로듀서 인스턴스 생성
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        // 메시지 전송
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "Hello Kafka " + i;
            
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("my-topic", key, value);
            
            // 비동기 전송
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("Sent record: " + 
                            "Topic: " + metadata.topic() + ", " +
                            "Partition: " + metadata.partition() + ", " +
                            "Offset: " + metadata.offset()
                        );
                    }
                }
            });
        }
        
        // 자원 해제
        producer.close();
    }
}

간단한 컨슈머 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        // 컨슈머 속성 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        
        // 컨슈머 인스턴스 생성
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // 토픽 구독
        consumer.subscribe(Arrays.asList("my-topic"));
        
        // 메시지 폴링 및 처리
        try {
            while (true) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(
                        "Received record: " +
                        "Topic: " + record.topic() + ", " +
                        "Partition: " + record.partition() + ", " +
                        "Offset: " + record.offset() + ", " +
                        "Key: " + record.key() + ", " +
                        "Value: " + record.value()
                    );
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 자원 해제
            consumer.close();
        }
    }
}

Python 클라이언트

Python 패키지 설치:

1
pip install kafka-python

Python 프로듀서 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from kafka import KafkaProducer
import json

# 프로듀서 생성
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 메시지 전송
for i in range(10):
    data = {'number': i, 'message': f'Hello Kafka {i}'}
    future = producer.send('my-topic', value=data)
    # 결과 확인
    try:
        record_metadata = future.get(timeout=10)
        print(f"Sent to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
    except Exception as e:
        print(f"Error sending message: {e}")

# 모든 메시지가 전송될 때까지 대기
producer.flush()
producer.close()

Python 컨슈머 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from kafka import KafkaConsumer
import json

# 컨슈머 생성
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id='my-python-consumer-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# 메시지 소비
for message in consumer:
    print(f"Received: {message.topic} partition {message.partition} offset {message.offset}")
    print(f"Key: {message.key}, Value: {message.value}")

고급 Kafka 기능

스키마 레지스트리

Kafka는 기본적으로 스키마 강제를 제공하지 않지만, Confluent Schema Registry와 함께 사용하여 메시지 포맷을 관리할 수 있다.

Avro 스키마 예제:

1
2
3
4
5
6
7
8
9
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["int", "null"]}
  ]
}

Kafka Streams

Kafka Streams는 Kafka 기반의 스트림 처리 라이브러리로, 스트림 처리 애플리케이션을 쉽게 구축할 수 있게 한다.

Kafka Connect

Kafka Connect는 Kafka와 외부 시스템 간의 데이터 이동을 위한 프레임워크이다.

소스 커넥터 설정 예제(MySQL에서 데이터 가져오기):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "user",
    "connection.password": "password",
    "topic.prefix": "mysql-",
    "table.whitelist": "users,orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "tasks.max": "1"
  }
}

싱크 커넥터 설정 예제(Elasticsearch로 데이터 보내기):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "mysql-users",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "tasks.max": "1"
  }
}

KSQL

KSQL은 Kafka 스트림에 대한 SQL 인터페이스를 제공하는 이벤트 스트리밍 데이터베이스이다.

KSQL 쿼리 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 스트림 생성
CREATE STREAM orders (
    order_id BIGINT,
    customer_id BIGINT,
    product_id BIGINT,
    quantity INT,
    price DECIMAL(10, 2)
) WITH (
    KAFKA_TOPIC='order-events',
    VALUE_FORMAT='JSON'
);

-- 테이블 생성
CREATE TABLE product_sales AS
SELECT
    product_id,
    SUM(quantity * price) AS total_sales
FROM orders
GROUP BY product_id;

-- 쿼리 실행
SELECT * FROM product_sales WHERE total_sales > 1000;

최적화 및 운영

  1. 프로듀서 성능 튜닝
    배치 설정:

    • batch.size: 한 번에 전송할 레코드 배치의 크기
    • linger.ms: 배치를 전송하기 전에 대기할 시간
      압축:
    • compression.type: 메시지 압축 유형(none, gzip, snappy, lz4, zstd)
      버퍼 및 재시도:
    • buffer.memory: 전송 대기 중인 레코드에 사용할 버퍼 크기
    • retries: 일시적인 오류에 대한 재시도 횟수
    • retry.backoff.ms: 재시도 간격
  2. 컨슈머 성능 튜닝
    폴링 및 처리:

    • max.poll.records: 단일 poll()에서 반환되는 최대 레코드 수
    • fetch.min.bytes: 요청당 가져올 최소 데이터 양
    • fetch.max.wait.ms: 최소 바이트를 사용할 수 없는 경우 대기 시간
      커밋 설정:
    • enable.auto.commit: 자동 오프셋 커밋 활성화 여부
    • auto.commit.interval.ms: 자동 커밋 간격
  3. 브로커 성능 튜닝
    디스크 성능:

    • 고성능 SSD 사용
    • RAID 구성보다 JBOD(Just a Bunch of Disks) 선호
    • log.flush.interval.messageslog.flush.interval.ms 조정
      JVM 튜닝:
    • 적절한 힙 크기 설정
    • G1GC 가비지 컬렉터 사용
      네트워크 설정:
    • num.network.threads: 네트워크 요청을 처리하는 스레드 수
    • num.io.threads: 디스크 I/O를 처리하는 스레드 수
  4. 모니터링 및 관리
    JMX 메트릭:

    • 생산자, 소비자, 브로커 메트릭 모니터링
    • Prometheus, Grafana, Datadog 등의 도구 활용
      로그 및 경고:
    • 브로커 로그 모니터링
    • 중요 메트릭에 대한 경고 설정
      관리 도구:
    • Kafka Manager(CMAK)
    • Confluent Control Center
    • Kafdrop

장애 처리 및 복구 전략

  1. 브로커 장애
    리더 선출: 브로커가 실패하면 해당 브로커가 리더였던 파티션은 ISR(In-Sync Replicas)에서 새 리더를 선출한다.
    브로커 복구 단계:
    1. 실패한 브로커를 재시작
    2. 자동으로 클러스터에 재참여
    3. 파티션 데이터 동기화
    4. 리더십 재분배
  2. 컨슈머 장애
    컨슈머 리밸런싱: 컨슈머가 실패하면 그룹 코디네이터가 리밸런싱을 시작하여 파티션을 나머지 컨슈머에게 재할당한다.
    예외 처리 전략:
    • 일시적인 오류: 재시도
    • 영구적인 오류: 데드레터(Dead Letter) 큐에 메시지 전달
    • 심각한 오류: 알림 및 수동 개입
  3. 데이터 내구성 보장
    최소 동기화 복제본: min.insync.replicas 설정으로 쓰기 작업의 내구성을 보장한다.
    메시지 전달 의미:
    • acks=0: 전송 확인 없음(최소 내구성)
    • acks=1: 리더만 확인(중간 내구성)
    • acks=all: 모든 ISR이 확인(최대 내구성)
  4. 재해 복구
    다중 데이터 센터 복제: MirrorMaker 2.0을 사용하여 데이터 센터 간 데이터 복제를 구현합니다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# source-connect.properties
bootstrap.servers=kafka1:9092,kafka2:9092
group.id=mirror-group
config.storage.topic=mm2-configs
offset.storage.topic=mm2-offsets
status.storage.topic=mm2-status

# target-connect.properties
bootstrap.servers=kafka3:9092,kafka4:9092
group.id=mirror-group
config.storage.topic=mm2-configs
offset.storage.topic=mm2-offsets
status.storage.topic=mm2-status

# mm2.properties
clusters=source, target
source.bootstrap.servers=kafka1:9092,kafka2:9092
target.bootstrap.servers=kafka3:9092,kafka4:9092
source->target.enabled=true
source->target.topics=.*
tasks.max=10

실무 적용 사례 및 디자인 패턴

이벤트 소싱 패턴

**이벤트 소싱(Event Sourcing)**은 상태 변경을 이벤트 시퀀스로 저장하는 패턴이다.

구현 단계:

  1. 도메인 이벤트 정의
  2. 이벤트를 Kafka 토픽에 저장
  3. 이벤트를 처리하여 현재 상태 구성
  4. 필요시 이벤트를 재생하여 상태 재구성

CQRS 패턴

**CQRS(Command Query Responsibility Segregation)**는 명령(쓰기)과 쿼리(읽기)를 분리하는 패턴이다.

Kafka를 활용한 CQRS 구현:

  1. 명령을 Kafka 토픽으로 전송
  2. 이벤트 처리기가 명령을 처리하고 상태 변경
  3. 상태 변경 이벤트를 Kafka 토픽에 게시
  4. 읽기 모델 프로젝션이 이벤트를 소비하여 읽기 전용 데이터베이스 업데이트
  5. 쿼리는 읽기 전용 데이터베이스에서 처리

데이터 파이프라인 구축

실시간 데이터 파이프라인 아키텍처:

  1. 소스 시스템(데이터베이스, 애플리케이션 등)에서 Kafka Connect로 데이터 수집
  2. Kafka 토픽에서 데이터 스트림 처리(Kafka Streams 또는 KSQL)
  3. 처리된 데이터를 분석 시스템(Elasticsearch, Hadoop 등)으로 전달
  4. 대시보드 또는 알림 시스템에서 결과 시각화

마이크로서비스 통신

Kafka를 활용한 마이크로서비스 통신 패턴:

이벤트 기반 통신:

요청-응답 패턴:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 요청 토픽에 메시지 전송
String correlationId = UUID.randomUUID().toString();
ProducerRecord<String, String> record = 
    new ProducerRecord<>("request-topic", correlationId, requestData);
producer.send(record);

// 응답 토픽에서 특정 상관 ID의 메시지만 소비
consumer.subscribe(Arrays.asList("response-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        if (record.key().equals(correlationId)) {
            // 응답 처리
            return record.value();
        }
    }
}

Kafka 보안

인증

SSL/TLS 인증:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# broker 설정
listeners=SSL://localhost:9093
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required

# 클라이언트 설정
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=truststore-password
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password

SASL 인증: Kafka는 다양한 SASL 메커니즘을 지원한다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 브로커 설정
listeners=SASL_SSL://localhost:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS 구성
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_user1="user1-secret";

# 클라이언트 설정
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="user1" \
    password="user1-secret";

인가

Kafka는 ACL(Access Control Lists)을 통해 리소스 접근 제어를 제공한다.

ACL 설정:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 토픽에 대한 읽기 권한 부여
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
    --add --allow-principal User:user1 \
    --operation Read \
    --topic my-topic

# 컨슈머 그룹에 대한 권한 부여
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
    --add --allow-principal User:user1 \
    --operation Read \
    --group my-consumer-group

# 프로듀서에 대한 쓰기 권한 부여
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
    --add --allow-principal User:user2 \
    --operation Write \
    --topic my-topic

중앙화된 권한 관리: Confluent의 RBAC(Role-Based Access Control)을 사용하여 보다 세분화된 권한 관리가 가능하다.

암호화

전송 중 암호화:

저장 데이터 암호화:

Kafka 배포 및 운영 모범 사례

  1. 클러스터 사이징
    브로커 수 결정 요소:

    • 처리해야 할 데이터 양
    • 필요한 처리량
    • 원하는 복제 인자
    • 내결함성 요구사항
      클러스터 크기 산정 공식:
    1
    
    필요한 브로커 수 = (데이터 처리량 × 보존 기간 × 복제 인자) / 브로커당 디스크 용량
    

    파티션 수 결정 요소:

    • 토픽당 처리량
    • 컨슈머 병렬 처리 수준
    • 메시지 순서 요구사항

    경험적 지침:

    • 파티션 수는 컨슈머 스레드 수의 2-3배
    • 토픽당 파티션 수는 브로커 수의 배수가 되도록 설정
    • 파티션 수가 너무 많으면 성능 저하와 관리 오버헤드 발생
  2. 프로덕션 배포 체크리스트
    하드웨어 요구사항:

    • 디스크: 고성능 SSD (RAID보다 JBOD 선호)
    • 메모리: 최소 32GB (힙 크기는 보통 최대 32GB)
    • CPU: 멀티코어 프로세서 (I/O와 네트워크 작업에 유리)
    • 네트워크: 최소 10 Gbps NIC

    OS 및 JVM 설정:

    • 파일 설명자 제한 증가 (ulimit -n)
    • 스왑 비활성화 또는 최소화
    • G1GC 가비지 컬렉터 사용
    • 적절한 힙 크기 설정

    중요 구성 매개변수:

    • log.retention.hours: 로그 보존 시간
    • log.retention.bytes: 로그 보존 크기
    • log.segment.bytes: 로그 세그먼트 크기
    • num.partitions: 기본 파티션 수
    • num.recovery.threads.per.data.dir: 서버 시작 시 복구 스레드 수
    • default.replication.factor: 기본 복제 인자
    • min.insync.replicas: 최소 동기화 복제본 수
  3. 운영 모범 사례
    백업 전략:

    • 정기적인 토픽 백업 (예: MirrorMaker)
    • 브로커 구성 파일 백업
    • 재해 복구 계획 수립

    모니터링 주요 메트릭:

    • 브로커 수준:
      • 활성 컨트롤러 수
      • 요청 핸들러 유휴 비율
      • 네트워크/디스크 처리량
      • JVM 메모리 사용량
    • 토픽/파티션 수준:
      • 복제 지연(Replication Lag)
      • ISR 축소율
      • 메시지 처리량
      • 파티션 불균형 지표
    • 클라이언트 수준:
      • 생산/소비 지연 시간
      • 요청 속도
      • 오류율

    롤링 업그레이드 절차:

    1. 새 버전 호환성 확인
    2. 구성 변경사항 검토
    3. 테스트 환경에서 검증
    4. 브로커를 한 번에 하나씩 업그레이드
    5. 각 브로커 업그레이드 후 클러스터 상태 확인
  4. 토픽 관리 전략
    토픽 설계 고려사항:

    • 메시지 크기와 볼륨
    • 보존 요구사항
    • 읽기/쓰기 패턴
    • 순서 보장 요구사항

    토픽 명명 규칙:

    1
    
    <환경>.<팀/서비스>.<데이터 유형>
    

    예: prod.users.registration, dev.payments.transactions

    토픽 구성 모범 사례:

    • 토픽별 파티션 수 최적화
    • 토픽별 보존 기간 설정
    • 토픽별 압축 설정

Kafka 에코시스템 및 통합

실무적 조언 및 일반적인 함정

  1. 성공적인 Kafka 도입을 위한 조언

    1. 점진적 접근:
      • 작은 프로젝트부터 시작
      • 경험을 쌓은 후 확장
      • 반복적인 아키텍처 개선
    2. 프로젝트 설계 시 고려사항:
      • 메시지 형식 및 스키마 표준화
      • 토픽 명명 규칙 수립
      • 명확한 책임 분리 및 소유권 정의
    3. 조직 전략:
      • 충분한 교육 및 지식 공유
      • 전담 Kafka 운영 팀 구성 고려
      • 내부 모범 사례 문서화
  2. 일반적인 함정과 해결 방법

    1. 과도한 파티션 수:
      • 문제: 파티션이 너무 많으면 성능 저하와 리밸런싱 시간 증가
      • 해결: 실제 처리량 요구사항에 맞춰 파티션 수 최적화
    2. 부적절한 키 선택:
      • 문제: 편향된 키 분포로 인한 핫 파티션
      • 해결: 균등하게 분포된 키 선택 또는 사용자 정의 파티셔너 구현
    3. 불충분한 모니터링:
      • 문제: 문제 발생 시 원인 파악 어려움
      • 해결: 포괄적인 모니터링 및 알림 시스템 구축
    4. 잘못된 소비자 오프셋 관리:
      • 문제: 메시지 중복 처리 또는 유실
      • 해결: 명시적인 오프셋 커밋 전략 및 멱등성 구현
  3. 성능 문제 해결 가이드

    • 높은 지연 시간 문제:
      1. 디스크 I/O 병목 확인
        1. 크 지연 시간 측정
        1. 일시 중지 모니터링
        1. 설정 최적화
    • 처리량 문제:
      1. 파티션 수 확인 및 조정
      2. 압축 설정 검토
      3. 클라이언트 배치 설정 최적화
      4. 하드웨어 리소스 확인
    • 불균형 클러스터:
      1. 토픽 파티션 할당 확인
      2. 리더 파티션 재분배 실행
      3. 사용자 정의 파티션 할당 전략 고려

Kafka의 미래와 최신 동향

KRaft 모드 (ZooKeeper 제거)

Kafka 3.0부터 ZooKeeper 의존성을 제거하고 Kafka Raft 메타데이터 모드를 도입했다.

KRaft 모드의 장점:

구성 방법:

1
2
3
4
# server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093,2@localhost:9093,3@localhost:9093

Tiered Storage

계층형 스토리지는 핫 데이터와 콜드 데이터를 구분하여 비용 효율적으로 관리한다.

장점:

Kafka Improvement Proposals(KIPs)

주목할 만한 최근 KIP:

산업 동향

실시간 데이터 파이프라인:

이벤트 중심 아키텍처:

Edge Computing:


용어 정리

용어설명

참고 및 출처

공식 문서 및 튜토리얼

실습 및 단계별 튜토리얼

온라인 강의 및 교육

커뮤니티 및 Q&A

참고 도서 및 자료