Apache Pulsar#
Apache Pulsar는 Yahoo에서 개발하고 Apache Software Foundation에서 관리하는 분산형 메시징 및 스트리밍 플랫폼이다. Pulsar는 기존의 메시지 큐와 스트리밍 시스템의 장점을 결합하여 높은 성능, 낮은 지연 시간, 그리고 강력한 내구성을 제공한다.
주요 특징#
- 멀티 테넌시(Multi-tenancy): 다양한 팀이나 애플리케이션이 같은 클러스터를 공유할 수 있다.
- 스트리밍과 큐잉의 통합: 전통적인 메시징과 스트리밍 패러다임을 동시에 지원한다.
- 지역 간 복제(Geo-replication): 여러 데이터센터나 지역에 걸쳐 데이터를 복제할 수 있다.
- 계층형 스토리지: 핫 데이터는 메모리나 SSD에, 콜드 데이터는 HDD나 객체 스토리지에 저장한다.
- Functions와 IO 커넥터: 서버리스 컴퓨팅과 데이터 통합 기능을 제공한다.
- 트랜잭션 지원: 정확히 한 번(exactly-once) 메시지 처리를 보장한다.
- 스키마 레지스트리: 메시지의 스키마와 버전 관리를 지원한다.
아키텍처 개요#
Pulsar의 아키텍처는 크게 세 가지 주요 컴포넌트로 구성된다:
- Broker: 클라이언트로부터 메시지를 수신하고 전달하는 역할을 한다.
- BookKeeper: 메시지를 저장하고 관리하는 분산 로그 스토리지 시스템이다.
- ZooKeeper: 메타데이터 관리와 조정을 담당한다.
이러한 구조를 통해 Pulsar는 스토리지와 컴퓨팅을 분리하여 각각을 독립적으로 확장할 수 있는 유연성을 제공한다.
Pulsar의 핵심 개념#
토픽과 서브스크립션#
파티션 토픽(Partitioned Topic)#
높은 처리량을 위해 토픽을 여러 파티션으로 나눌 수 있다. 각 파티션은 독립적인 토픽으로 처리되며, 메시지는 키(key)를 기반으로 파티션에 분배된다.
네임스페이스와 테넌트#
테넌트(Tenant): 조직이나 팀을 나타내는 최상위 단위이다.
네임스페이스(Namespace): 테넌트 내에서 관련된 토픽들을 그룹화하는 단위이다. 네임스페이스 수준에서 정책(TTL, 보존 정책 등)을 설정할 수 있다.
메시지 형식#
Pulsar 메시지는 다음과 같은 구성 요소를 포함할 수 있다:
- 값(Value): 실제 메시지 내용
- 키(Key): 메시지 라우팅에 사용
- 속성(Properties): 사용자 정의 메타데이터
- 이벤트 시간(Event Time): 메시지가 생성된 시간
- 스키마(Schema): 메시지의 구조를 정의
Pulsar 설치 및 기본 구성#
독립 실행형(Standalone) 모드 설치#
개발 및 테스트 환경에서는 Pulsar를 독립 실행형 모드로 실행할 수 있다. 이 모드에서는 ZooKeeper, BookKeeper, Broker가 모두 하나의 프로세스에서 실행된다.
1
2
3
4
5
6
7
| # Pulsar 다운로드
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
tar -xf apache-pulsar-2.10.1-bin.tar.gz
cd apache-pulsar-2.10.1
# Standalone 모드 시작
bin/pulsar standalone
|
클러스터 모드 설치#
프로덕션 환경에서는 고가용성과 확장성을 위해 클러스터 모드로 Pulsar를 배포해야 한다. 이를 위해서는 여러 ZooKeeper, BookKeeper, Broker 인스턴스를 설정해야 한다.
기본적인 3노드 클러스터 구성:
- 3개의 ZooKeeper 노드
- 3개 이상의 BookKeeper 노드
- 3개 이상의 Broker 노드
Docker 및 Kubernetes 배포#
Docker 및 Kubernetes를 사용한 배포도 가능하다.
Docker Compose를 사용한 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| version: '3'
services:
zookeeper:
image: apachepulsar/pulsar:2.10.1
command: bin/pulsar zookeeper
ports:
- "2181:2181"
bookie:
image: apachepulsar/pulsar:2.10.1
command: bin/pulsar bookie
depends_on:
- zookeeper
broker:
image: apachepulsar/pulsar:2.10.1
command: bin/pulsar broker
ports:
- "8080:8080"
- "6650:6650"
depends_on:
- zookeeper
- bookie
|
주요 구성 파일#
- broker.conf: Broker 설정
- bookkeeper.conf: BookKeeper 설정
- client.conf: 클라이언트 설정
- pulsar_env.sh: 환경 변수 설정
중요한 구성 매개변수들:
brokerServicePort
: Broker 서비스 포트 (기본값: 6650)webServicePort
: Admin API 포트 (기본값: 8080)managedLedgerDefaultEnsembleSize
: BookKeeper 앙상블 크기managedLedgerDefaultWriteQuorum
: 쓰기 정족수managedLedgerDefaultAckQuorum
: 확인 정족수
프로듀서와 컨슈머 프로그래밍 기초#
Java 클라이언트#
Maven 의존성 추가:
1
2
3
4
5
| <dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
</dependency>
|
간단한 프로듀서 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import org.apache.pulsar.client.api.*;
public class SimplePulsarProducer {
public static void main(String[] args) throws Exception {
// Pulsar 클라이언트 생성
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 프로듀서 생성
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
// 메시지 전송
for (int i = 0; i < 10; i++) {
String message = "Hello Pulsar " + i;
producer.send(message);
System.out.println("Sent message: " + message);
}
// 자원 해제
producer.close();
client.close();
}
}
|
간단한 컨슈머 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| import org.apache.pulsar.client.api.*;
public class SimplePulsarConsumer {
public static void main(String[] args) throws Exception {
// Pulsar 클라이언트 생성
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 컨슈머 생성
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
// 메시지 수신
while (true) {
Message<String> msg = consumer.receive();
try {
String content = msg.getValue();
System.out.println("Received message: " + content);
// 메시지 확인(ack)
consumer.acknowledge(msg);
} catch (Exception e) {
// 메시지 처리 실패 시 부정 확인(nack)
consumer.negativeAcknowledge(msg);
}
}
}
}
|
Python 클라이언트#
Python 패키지 설치:
1
| pip install pulsar-client
|
Python 프로듀서 예제:
1
2
3
4
5
6
7
8
9
10
11
| import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
message = f'Hello Pulsar {i}'
producer.send(message.encode('utf-8'))
print(f'Sent message: {message}')
client.close()
|
Python 컨슈머 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print(f'Received message: {msg.data().decode("utf-8")}')
consumer.acknowledge(msg)
except Exception as e:
consumer.negative_acknowledge(msg)
client.close()
|
고급 Pulsar 기능#
스키마 레지스트리#
Pulsar는 JSON, Avro, Protobuf 등 다양한 스키마 형식을 지원한다. 스키마를 사용하면 메시지의 형식을 강제하고 타입 안전성을 보장할 수 있다.
Avro 스키마 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // Avro 스키마 정의
static class User {
private String name;
private int age;
// 생성자, getter, setter 등
}
// 스키마를 사용한 프로듀서
Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
.topic("users-topic")
.create();
// 메시지 전송
User user = new User("Alice", 30);
producer.send(user);
|
Pulsar Functions#
Pulsar Functions는 서버리스 계산 모델을 제공하여 스트림 처리 로직을 구현할 수 있게 한다.
간단한 함수 예제 (Java):
1
2
3
4
5
6
7
8
9
10
11
| public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
String key = word.toLowerCase();
context.incrCounter(key, 1);
}
return null;
}
}
|
함수 배포:
1
2
3
4
5
6
7
8
| bin/pulsar-admin functions create \
--jar target/my-function.jar \
--classname org.example.WordCountFunction \
--tenant public \
--namespace default \
--name word-count \
--inputs persistent://public/default/sentences \
--log-topic persistent://public/default/log-topic
|
Pulsar IO 커넥터#
Pulsar IO는 다양한 외부 시스템과의 통합을 위한 커넥터를 제공한다.
Kafka 소스 커넥터 예제:
1
2
3
4
5
6
7
8
9
10
11
| bin/pulsar-admin source create \
--tenant public \
--namespace default \
--name kafka-source \
--source-type kafka \
--destination-topic-name pulsar-kafka-topic \
--source-config '{
"bootstrapServers": "kafka-broker:9092",
"topic": "kafka-topic",
"groupId": "pulsar-kafka-group"
}'
|
지역 간 복제(Geo-replication)#
여러 데이터센터나 지역 간에 메시지를 복제할 수 있다.
복제 클러스터 설정:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 클러스터 설정
bin/pulsar-admin clusters create cluster-a \
--url http://broker-a:8080 \
--broker-url pulsar://broker-a:6650
bin/pulsar-admin clusters create cluster-b \
--url http://broker-b:8080 \
--broker-url pulsar://broker-b:6650
# 네임스페이스에 복제 설정
bin/pulsar-admin namespaces set-clusters \
public/default \
--clusters cluster-a,cluster-b
|
트랜잭션 지원#
Pulsar 2.7.0부터 트랜잭션 기능이 도입되어 정확히 한 번(exactly-once) 메시지 처리를 보장한다.
트랜잭션 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // 트랜잭션 시작
Transaction txn = client.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();
// 트랜잭션 내에서 메시지 생성
producer.newMessage(txn).value("Hello Pulsar Transaction").send();
// 트랜잭션 내에서 메시지 소비
Message<String> msg = consumer.receive();
consumer.acknowledgeAsync(msg, txn);
// 트랜잭션 커밋
txn.commit().get();
|
최적화 및 운영#
성능 튜닝
브로커 튜닝:
defaultRetentionTimeInMinutes
: 메시지 보존 기간defaultRetentionSizeInMB
: 메시지 보존 크기managedLedgerCacheSizeMB
: 관리 레저 캐시 크기numIOThreads
: I/O 스레드 수numWorkerThreads
: 작업자 스레드 수
BookKeeper 튜닝:journalMaxSizeMB
: 저널 최대 크기ledgerStorageClass
: 레저 스토리지 클래스 (DbLedgerStorage 권장)diskUsageThreshold
: 디스크 사용량 임계값
클라이언트 튜닝:- 배치 설정
- 압축 설정
- 비동기 전송
모니터링 설정
Pulsar는 Prometheus와 Grafana를 통한 모니터링을 지원한다.
- Prometheus 설정: Pulsar broker.conf에 다음을 추가:
1
2
| prometheusStatsLatencyRolloverSeconds=60
exposeTopicLevelMetricsInPrometheus=true
|
- Grafana 대시보드: Pulsar는 다양한 Grafana 대시보드 템플릿을 제공한다.
장애 처리 및 복구
브로커 장애:
- 브로커는 무상태(stateless)이므로 쉽게 대체 가능
- 자동 장애 조치(failover) 메커니즘 구현
BookKeeper 장애: - 복제 인자(replication factor)에 따라 내결함성 제공
- 자동 복구 메커니즘
- Auditor 기능으로 누락된 데이터 감지 및 복구
ZooKeeper 장애: - ZooKeeper 앙상블은 과반수(majority)가 정상 작동해야 함
- 최소 3개의 ZooKeeper 노드 구성 권장
백업 및 복구 전략
데이터 백업:
- BookKeeper 데이터 정기 백업
- 메타데이터(ZooKeeper) 정기 백업
재해 복구: - 지역 간 복제를 통한 재해 복구 구현
- 여러 데이터센터에 걸친 클러스터 배포
실무 적용 사례 및 디자인 패턴#
- 이벤트 소싱(Event Sourcing)
모든 상태 변경을 이벤트로 저장하고, 이 이벤트들을 재생하여 현재 상태를 재구성하는 패턴이다.
구현 예시:- 각 도메인 이벤트를 토픽에 저장
- Pulsar Functions를 사용하여 이벤트 처리 및 상태 업데이트
- 필요시 이벤트를 재생하여 상태 재구성
- CQRS(Command Query Responsibility Segregation)
명령(쓰기)과 쿼리(읽기)를 분리하는 패턴이다.
구현 예시:- 명령은 Pulsar 토픽으로 전송
- Pulsar Functions가 명령을 처리하고 상태 변경
- 변경된 상태는 별도의 읽기 전용 데이터베이스에 저장
- 쿼리는 읽기 전용 데이터베이스에서 처리
- 실시간 분석 파이프라인
Pulsar를 사용한 실시간 데이터 처리 파이프라인 구축:- 이벤트 데이터를 Pulsar 토픽으로 수집
- Pulsar Functions로 데이터 전처리 및 변환
- 처리된 데이터를 분석 시스템(예: Flink, Spark)으로 전달
- 분석 결과를 다시 Pulsar 토픽에 게시
- 결과를 대시보드나 알림 시스템에 전달
- 마이크로서비스 통신
마이크로서비스 간 통신에 Pulsar 활용:- 동기식 통신: Request-Reply 패턴 구현
- 비동기식 통신: 이벤트 기반 메시징
- 트랜잭션을 통한 일관성 보장
- 스키마 레지스트리를 통한 계약 관리
심화 주제#
Pulsar SQL#
Presto를 기반으로 한 Pulsar SQL을 사용하면 SQL 쿼리를 통해 Pulsar 토픽의 데이터를 분석할 수 있다.
설정 및 활성화:
1
| bin/pulsar sql-worker start
|
쿼리 예제:
1
2
3
| SELECT * FROM "pulsar"."public/default"."my-topic"
WHERE publish_time > TIMESTAMP '2022-01-01 00:00:00'
LIMIT 10;
|
Pulsar 관리 API#
REST API를 통해 Pulsar 클러스터를 관리할 수 있다.
토픽 생성 예제:
1
| curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/my-topic
|
토픽 통계 조회:
1
| curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats
|
보안 설정#
인증:
TLS 인증 설정:
1
2
3
4
5
| # broker.conf
tlsEnabled=true
tlsCertificateFilePath=/path/to/broker-cert.pem
tlsKeyFilePath=/path/to/broker-key.pem
tlsTrustCertsFilePath=/path/to/ca-cert.pem
|
인가:
- 역할 기반 접근 제어(RBAC)
- 슈퍼유저 및 일반 역할 설정
용어 정리#
참고 및 출처#
Apache Pulsar vs. Kafka 분산 메시징 시스템은 현대 데이터 중심 아키텍처의 중추 역할을 한다.
기본 개념 및 역사 Apache Kafka Apache Kafka는 2011년 LinkedIn에서 개발된 후 Apache 소프트웨어 재단으로 이관된 분산 스트리밍 플랫폼이다. 처음에는 LinkedIn 내부의 데이터 파이프라인 문제를 해결하기 위해 만들어졌지만, 이후 업계 표준 메시징 시스템으로 자리 잡았다. Kafka는 높은 처리량, 내구성, 확장성을 제공하는 로그 기반의 발행-구독(pub-sub) 메시징 시스템이다.
Apache Pulsar Apache Pulsar는 2016년 Yahoo에서 개발되어 2018년에 Apache 소프트웨어 재단의 최상위 프로젝트가 되었다. Pulsar는 다중 테넌트, 고성능 서비스로 설계되었으며, Kafka와 같은 발행-구독 메시징 시스템의 특성과 전통적인 메시지 큐의 장점을 결합했다. Pulsar는 처음부터 클라우드 네이티브 환경을 염두에 두고 개발되었다.
...
Apache Pulsar vs. RabbitMQ Apache Pulsar와 RabbitMQ는 메시징 시스템으로서 각각 고유한 강점과 약점을 가지고 있으며, 사용 사례에 따라 적합한 선택이 달라질 수 있다.
기본 개념 및 역사 Apache Pulsar Apache Pulsar는 2016년 Yahoo에서 개발되어 2018년 Apache 소프트웨어 재단의 최상위 프로젝트가 되었다. Pulsar는 처음부터 클라우드 네이티브 환경과 대규모 분산 시스템을 위해 설계되었으며, 높은 처리량과 낮은 지연 시간을 모두 달성하는 메시징 및 스트리밍 플랫폼이다.
RabbitMQ RabbitMQ는 2007년 Rabbit Technologies Ltd.에서 개발되었으며, 현재는 VMware의 일부인 Pivotal Software에서 관리되고 있다. Erlang으로 작성된 RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 구현한 가장 널리 사용되는 오픈 소스 메시지 브로커 중 하나이다. 신뢰성, 유연성, 상호 운용성에 중점을 두고 설계되었다.
...