RabbitMQ#
RabbitMQ는 Erlang 언어로 작성된 오픈 소스 메시지 브로커 시스템으로, AMQP(Advanced Message Queuing Protocol)를 구현하고 있다. 2007년에 처음 출시되었으며, 현재는 VMware의 자회사인 Pivotal Software에서 관리하고 있다. RabbitMQ는 안정성, 확장성, 다양한 메시징 패턴 지원 등으로 인해 많은 기업들이 메시지 기반 아키텍처의 핵심 컴포넌트로 채택하고 있다.
주요 특징#
- 표준 프로토콜 지원: AMQP 0-9-1을 기본으로 하며, STOMP, MQTT, HTTP 등 다양한 프로토콜을 플러그인을 통해 지원한다.
- 유연한 라우팅: Exchange와 바인딩을 통한 강력하고 유연한 메시지 라우팅 기능을 제공한다.
- 클러스터링: 높은 가용성과 처리량을 위한 내장 클러스터링 기능을 제공한다.
- 관리 인터페이스: 직관적인 웹 기반 관리 UI와 HTTP API를 제공한다.
- 플러그인 아키텍처: 기능을 확장할 수 있는 다양한 플러그인을 지원한다.
- 다양한 언어 지원: Java, Python, Ruby, PHP, C#, JavaScript 등 다양한 언어의 클라이언트 라이브러리를 제공한다.
RabbitMQ의 주요 사용 사례#
- 비동기 처리: 시간이 오래 걸리는 작업을 비동기적으로 처리할 수 있다.
- 서비스 간 통신: 마이크로서비스 아키텍처에서 서비스 간 통신 채널로 활용된다.
- 부하 분산: 작업을 여러 워커에게 분산시켜 시스템의 부하를 균등하게 분배한다.
- 이벤트 기반 아키텍처: 이벤트를 생성하고 소비하는 이벤트 기반 시스템의 기반이 된다.
- 데이터 스트리밍: 실시간 데이터 스트림을 처리하는 데 사용된다.
RabbitMQ의 핵심 개념#
AMQP 프로토콜
AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어를 위한 개방형 표준 프로토콜이다. AMQP의 주요 개념을 이해하는 것은 RabbitMQ를 효과적으로 사용하기 위한 기본이다.
RabbitMQ 아키텍처
RabbitMQ의 아키텍처는 다음 구성 요소로 이루어져 있다:
- 프로듀서(Producer)
메시지를 생성하여 RabbitMQ로 전송하는 애플리케이션이다. 프로듀서는 메시지를 생성하고 Exchange에 발행한다. - 컨슈머(Consumer)
RabbitMQ로부터 메시지를 수신하고 처리하는 애플리케이션이다. 컨슈머는 큐에서 메시지를 구독하여 처리한다. - Exchange
프로듀서로부터 받은 메시지를 큐로 라우팅하는 라우터 역할을 한다. Exchange 타입에 따라 메시지 라우팅 방식이 달라진다.
Exchange 타입:
- Direct Exchange: 라우팅 키가 정확히 일치하는 큐에 메시지를 전달한다.
- Fanout Exchange: 바인딩된 모든 큐에 메시지를 브로드캐스트한다.
- Topic Exchange: 라우팅 키 패턴이 일치하는 큐에 메시지를 전달한다.
- Headers Exchange: 메시지 헤더 속성을 기반으로 라우팅한다.
- 큐(Queue)
메시지가 저장되는 버퍼이다. 컨슈머는 큐에서 메시지를 가져와 처리한다. 큐는 FIFO(First In, First Out) 방식으로 작동한다. - 바인딩(Binding)
Exchange와 큐 사이의 관계를 정의한다. 바인딩은 라우팅 키(Routing Key)를 사용하여 Exchange가 메시지를 어떤 큐로 전달할지 결정하는 규칙을 설정한다. - 가상 호스트(Virtual Host)
리소스(Exchange, 큐 등)를 논리적으로 그룹화하는 네임스페이스이다. 가상 호스트는 자체 사용자 권한을 가지며, 서로 다른 애플리케이션이 같은 RabbitMQ 서버를 공유할 수 있게 한다.
메시징 패턴
RabbitMQ에서 구현할 수 있는 주요 메시징 패턴:
작업 큐(Work Queue)
시간이 오래 걸리는 작업을 여러 워커에게 분산시키는 패턴이다. 라운드 로빈 방식으로 작업을 분배한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| ## Python 작업 큐 예제 (생산자)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = "복잡한 작업 내용…"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2) # 메시지 지속성 설정
)
print(f" [x] {message} 전송됨")
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # Python 작업 큐 예제 (소비자)
import pika, time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] 메시지를 기다리는 중. 종료하려면 CTRL+C를 누르세요')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()} 수신됨")
time.sleep(body.count(b'.')) # 메시지 내 점의 개수만큼 대기
print(" [x] 완료")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 워커에게 한 번에 하나의 메시지만 할당
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
|
구독(Publish/Subscribe)
하나의 메시지를 여러 소비자에게 브로드캐스트하는 패턴이다. Fanout Exchange를 사용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # Python 발행/구독 예제 (발행자)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "정보: 이것은 로그 메시지입니다."
channel.basic_publish(
exchange='logs',
routing_key='',
body=message
)
print(f" [x] {message} 전송됨")
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # Python 발행/구독 예제 (구독자)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] 로그를 기다리는 중. 종료하려면 CTRL+C를 누르세요')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
|
라우팅(Routing)
특정 기준에 따라 메시지를 선택적으로 수신하는 패턴이다. Direct Exchange를 사용한다.
토픽(Topic)
여러 기준에 따라 메시지를 패턴 매칭으로 라우팅하는 패턴이다. Topic Exchange를 사용한다.
RPC(Remote Procedure Call)
클라이언트가 요청을 보내고 응답을 기다리는 동기적인 패턴이다.
RabbitMQ 설치 및 기본 구성#
단독 서버 설치#
Linux에 설치#
1
2
3
4
5
6
7
8
| # Debian/Ubuntu
sudo apt-get install rabbitmq-server
# 서비스 시작
sudo systemctl start rabbitmq-server
# 부팅 시 자동 시작
sudo systemctl enable rabbitmq-server
|
Docker를 사용한 설치#
1
2
| # RabbitMQ 도커 이미지 실행
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
관리 인터페이스 활성화#
1
2
3
4
5
6
7
8
9
| # 관리 플러그인 활성화
sudo rabbitmq-plugins enable rabbitmq_management
# 사용자 추가
sudo rabbitmqctl add_user admin strongpassword
# 관리자 권한 부여
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
|
관리 인터페이스는 http://localhost:15672에서> 접근할 수 있으며, 기본 사용자 이름과 비밀번호는 guest/guest이다(로컬호스트에서만 작동).
클러스터 설정#
기본 클러스터 설정#
클러스터는 최소 3개의 노드로 구성하는 것이 좋다. 각 노드에서 RabbitMQ를 설치한 후 다음 단계를 따른다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| # 노드 1에서 Erlang 쿠키 확인
cat /var/lib/rabbitmq/.erlang.cookie
# 모든 노드에서 동일한 Erlang 쿠키 설정
# 노드 2, 3에서:
sudo service rabbitmq-server stop
echo "ERLANG_COOKIE_FROM_NODE_1" | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
sudo service rabbitmq-server start
# 노드 2에서 노드 1에 조인
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@node1
sudo rabbitmqctl start_app
# 노드 3에서도 동일하게 수행
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@node1
sudo rabbitmqctl start_app
# 클러스터 상태 확인
sudo rabbitmqctl cluster_status
|
고가용성 설정#
미러링 큐를 통해 고가용성을 확보할 수 있다:
1
2
| # 모든 큐를 자동으로 미러링하는 정책 설정
sudo rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
|
주요 구성 파일#
RabbitMQ의 주요 구성 파일은 다음과 같다:
/etc/rabbitmq/rabbitmq.conf
: 기본 구성 파일/etc/rabbitmq/advanced.config
: 고급 Erlang 구성/etc/rabbitmq/rabbitmq-env.conf
: 환경 변수 설정
중요한 구성 매개변수:
1
2
3
4
5
| # rabbitmq.conf 예제
listeners.tcp.default = 5672
management.tcp.port = 15672
vm_memory_high_watermark.relative = 0.4
disk_free_limit.absolute = 1GB
|
RabbitMQ 클라이언트 프로그래밍#
Java 클라이언트#
의존성 추가 (Maven):
1
2
3
4
5
| <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.15.0</version>
</dependency>
|
기본 생산자 코드:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
|
기본 소비자 코드:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
|
Spring AMQP 사용#
Spring Boot 애플리케이션에서는 Spring AMQP를 통해 더 쉽게 RabbitMQ를 사용할 수 있다.
의존성 추가 (Gradle):
1
| implementation 'org.springframework.boot:spring-boot-starter-amqp'
|
RabbitMQ 구성:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Binding binding(Queue helloQueue, DirectExchange exchange) {
return BindingBuilder.bind(helloQueue).to(exchange).with("hello");
}
}
|
메시지 생산자:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public void send(String message) {
rabbitTemplate.convertAndSend("direct-exchange", "hello", message);
System.out.println("메시지 전송됨: " + message);
}
}
|
메시지 소비자:
1
2
3
4
5
6
7
8
9
10
11
12
| import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQReceiver {
@RabbitListener(queues = "hello")
public void receiveMessage(String message) {
System.out.println("메시지 수신됨: " + message);
// 메시지 처리 로직
}
}
|
Node.js 클라이언트#
Amqplib 설치:
기본 생산자 코드:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', (error0, connection) => {
if (error0) {
throw error0;
}
connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
const queue = 'hello';
const msg = 'Hello World!';
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
});
});
|
기본 소비자 코드:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', (error0, connection) => {
if (error0) {
throw error0;
}
connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
const queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, (msg) => {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});
});
});
|
고급 RabbitMQ 기능#
메시지 내구성#
메시지가 유실되지 않도록 내구성을 보장하는 방법:
지속적인 큐 선언:
1
| channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable=true
|
지속적인 메시지 발행:
1
2
3
| channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
|
메시지 확인(Acknowledge):
1
2
3
| channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
// 처리 후 명시적으로 확인
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
메시지 제어#
프리페치 카운트(Prefetch Count)#
한 번에 처리할 수 있는 메시지 수를 제한한다:
1
| channel.basicQos(1); // 한 번에 하나의 메시지만 처리
|
메시지 TTL(Time to Live)#
메시지의 유효 기간을 설정한다:
1
2
3
| Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60초 후 만료
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
|
데드 레터 익스체인지(Dead Letter Exchange)#
처리할 수 없는 메시지를 특별한 큐로 보낸다:
1
2
3
4
| Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead-letter-exchange");
args.put("x-dead-letter-routing-key", "dead-letter");
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
|
교환기 유형과 바인딩 전략#
Direct Exchange#
정확한 라우팅 키 일치에 기반한 라우팅:
1
2
| channel.exchangeDeclare("direct_logs", "direct");
channel.queueBind(queueName, "direct_logs", "error"); // error 메시지만 수신
|
Topic Exchange#
패턴 매칭을 사용한 라우팅:
1
2
| channel.exchangeDeclare("topic_logs", "topic");
channel.queueBind(queueName, "topic_logs", "kern.*"); // kern으로 시작하는 모든 메시지
|
패턴에서:
*
: 정확히 하나의 단어와 일치#
: 0개 이상의 단어와 일치
메시지 헤더에 기반한 라우팅:
1
2
3
4
5
6
7
| channel.exchangeDeclare("header_exchange", "headers");
Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "all"); // all: 모든 헤더 일치, any: 하나라도 일치
bindingArgs.put("format", "pdf");
bindingArgs.put("type", "report");
channel.queueBind(queueName, "header_exchange", "", bindingArgs);
|
플러그인을 통한 기능 확장#
RabbitMQ는 플러그인을 통해 기능을 확장할 수 있다:
1
2
3
4
5
6
| # 플러그인 목록 보기
rabbitmq-plugins list
# 플러그인 활성화
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
유용한 플러그인:
rabbitmq_management
: 웹 기반 관리 인터페이스rabbitmq_shovel
: 브로커 간 메시지 전송rabbitmq_federation
: 브로커 간 토폴로지 연결rabbitmq_delayed_message_exchange
: 지연 메시지 전송rabbitmq_mqtt
: MQTT 프로토콜 지원rabbitmq_web_stomp
: WebSocket을 통한 STOMP 지원
최적화 및 모니터링#
6.1 성능 튜닝#
6.1.1 메모리 관리#
RabbitMQ는 기본적으로 사용 가능한 시스템 메모리의 40%를 사용하도록 설정되어 있습니다. 이 값은 조정 가능합니다:
1
2
| # rabbitmq.conf
vm_memory_high_watermark.relative = 0.6 # 메모리의 60%까지 사용
|
또는 절대값으로 설정:
1
| vm_memory_high_watermark.absolute = 2GB
|
6.1.2 디스크 공간 관리#
RabbitMQ는 디스크 공간이 부족하면 메시지 수락을 중지합니다:
1
2
| # rabbitmq.conf
disk_free_limit.absolute = 5GB
|
6.1.3 큐 최적화#
큐 타입과 큐 인자를 사용하여 성능을 최적화할 수 있습니다:
1
2
3
4
| Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum"); // 쿼럼 큐 사용 (3.8 이상)
args.put("x-max-length", 10000); // 큐 길이 제한
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
|
6.1.4 채널 및 연결 관리#
- 채널을 재사용하세요 (생성 비용이 높음)
- 필요 이상으로 많은 연결을 맺지 마세요
- 연결 풀을 사용하여 효율적으로 관리하세요
6.1.5 배치 처리#
성능을 위해 메시지를 배치로 발행하고 소비합니다:
1
2
3
4
5
6
7
| // 배치 발행
for (int i = 0; i < batchSize; i++) {
channel.basicPublish("", QUEUE_NAME, null, messages[i].getBytes());
}
// 배치 소비
channel.basicQos(100); // 한 번에 100개 메시지 처리
|
6.2 모니터링#
6.2.1 관리 UI 사용#
웹 관리 인터페이스(http://localhost:15672)에서> 다양한 메트릭을 모니터링할 수 있습니다:
- 큐, 연결, 채널 상태
- 메시지 처리량
- 노드 자원 사용량
- 클러스터 상태
6.2.2 HTTP API 사용#
HTTP API를 통해 프로그래밍 방식으로 모니터링할 수 있습니다:
1
2
| # 큐 상태 확인
curl -u guest:guest http://localhost:15672/api/queues/%2F/my_queue
|
6.2.3 Prometheus 및 Grafana 통합#
RabbitMQ Prometheus 플러그인을 사용하여 더 강력한 모니터링을 구성할 수 있습니다:
1
2
| # Prometheus 플러그인 활성화
rabbitmq-plugins enable rabbitmq_prometheus
|
Prometheus 설정:
1
2
3
4
| scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['rabbitmq:15692']
|
그런 다음 Grafana에서 RabbitMQ 대시보드를 가져와 사용할 수 있습니다.
6.3 성능 테스트#
PerfTest 도구를 사용하여 RabbitMQ 성능을 테스트할 수 있습니다:
1
2
| # 성능 테스트 실행
./runjava com.rabbitmq.perf.PerfTest -h amqp://localhost -x 1 -y 1 -u "throughput-test" -a --id "test 1"
|
중요한 성능 지표:
- 초당 메시지 수
- 소비자 및 생산자 지연 시간
- 메모리 및 CPU 사용량
- 디스크 I/O
RabbitMQ의 장애 처리 및 고가용성#
장애 시나리오 및 처리#
브로커 장애#
브로커 노드가 실패할 경우 클러스터의 다른 노드가 작업을 인계받는다. 미러링된 큐를 사용하면 데이터 손실을 방지할 수 있다. 미러링 큐는 모든 메시지의 복사본을 여러 노드에 유지하여 노드 장애 시에도 메시지를 사용할 수 있게 한다.
RabbitMQ 3.8 이후 버전에서는 쿼럼 큐(Quorum Queues)라는 새로운 고가용성 큐 타입을 제공한다:
1
2
3
| Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("ha-queue", true, false, false, args);
|
쿼럼 큐는 Raft 합의 프로토콜을 사용하여 더 강력한 일관성과 내결함성을 제공한다.
네트워크 분할#
네트워크 분할(Network Partition) 또는 “브레인 스플릿(Brain Split)“이 발생하면 RabbitMQ 클러스터가 여러 부분으로 나뉘어 각각 독립적으로 작동하게 된다. 이 문제를 해결하기 위한 정책을 구성할 수 있다:
1
2
| # rabbitmq.conf
cluster_partition_handling = autoheal # 자동 복구 모드
|
가능한 설정:
ignore
: 분할을 무시하고 각 부분이 독립적으로 작동autoheal
: 자동으로 복구 시도 (소수 파티션을 재시작)pause_minority
: 소수 파티션 노드를 일시 중지
디스크 공간 부족#
디스크 공간이 부족하면 RabbitMQ는 새 메시지의 수락을 중지한다.
이를 위한 조치:
- 중요하지 않은 메시지 제거
- 오래된 메시지 제거를 위한 TTL 설정
- 디스크 경보 임계값 조정
1
2
| # 디스크 경보 임계값 조정
rabbitmqctl set_disk_free_limit 5GB
|
메모리 경보#
메모리 사용량이 높으면 RabbitMQ는 생산자의 속도를 제한한다:
1
2
| # 메모리 경보 임계값 조정
rabbitmqctl set_vm_memory_high_watermark 0.6
|
고가용성 구성#
클러스터링#
기본 RabbitMQ 클러스터는 메타데이터를 복제하지만 큐의 내용은 복제하지 않는다. 높은 가용성을 위해서는 미러링 큐나 쿼럼 큐를 사용해야 한다.
미러링 큐 정책#
관리 UI 또는 CLI를 통해 미러링 정책을 설정할 수 있다:
1
2
3
4
5
6
7
8
| # 모든 큐를 모든 노드에 미러링
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}' --apply-to queues
# 모든 큐를 특정 수의 노드에 미러링
rabbitmqctl set_policy ha-two ".*" '{"ha-mode":"exactly","ha-params":2}' --apply-to queues
# 대기열 이름이 "ha."로 시작하는 큐만 미러링
rabbitmqctl set_policy ha-match "^ha\." '{"ha-mode":"all"}' --apply-to queues
|
쿼럼 큐#
쿼럼 큐는 Raft 합의 알고리즘을 사용하여 메시지의 일관성을 보장한다:
1
2
3
4
5
6
7
| # 쿼럼 큐 선언 (클라이언트에서)
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("critical-queue", true, false, false, args);
# 또는 정책을 통해 설정
rabbitmqctl set_policy quorum-queues "^critical\." '{"x-queue-type":"quorum"}' --apply-to queues
|
쿼럼 큐의 특징:
- 강력한 내구성과 일관성
- 브로커 장애에 더 잘 대응
- 제한된 기능 (예: 임시 큐 지원 안 함)
- 미러링 큐보다 더 효율적인 리소스 사용
가용성 영역 간 배포#
클라우드 환경에서는 여러 가용성 영역(AZ)에 걸쳐 RabbitMQ 노드를 배포하여 전체 AZ 장애에도 서비스를 유지할 수 있다:
1
2
3
| 노드 1: AZ-1
노드 2: AZ-2
노드 3: AZ-3
|
이 설정은 AZ 간 네트워크 지연을 고려해야 한다.
페더레이션 플러그인#
federation 플러그인을 사용하면 지역적으로 분산된 브로커 간에 메시지를 전달할 수 있다:
1
2
3
4
5
6
7
8
9
| # 페더레이션 플러그인 활성화
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
# 업스트림 설정
rabbitmqctl set_parameter federation-upstream my-upstream '{"uri":"amqp://remote-host"}'
# 정책 설정
rabbitmqctl set_policy federate-me "^federated\." '{"federation-upstream-set":"all"}' --apply-to exchanges
|
백업 및 복구 전략#
정의 백업#
RabbitMQ의 토폴로지 정의(exchanges, queues, bindings, policies)를 백업하는 것이 중요하다:
1
2
3
4
5
| # 정의 내보내기
rabbitmqctl export_definitions /path/to/definitions.json
# 정의 가져오기
rabbitmqctl import_definitions /path/to/definitions.json
|
메시지 백업#
메시지 백업을 위한 몇 가지 전략:
- 쇼벨 플러그인을 사용하여 중요한 메시지를 백업 큐로 복사
- 중요한 메시지를 소비하여 외부 스토리지에 저장
- 미러링 큐 사용으로 메시지 복제
보안 및 인증#
사용자 관리 및 권한#
RabbitMQ는 사용자 계정과 권한을 관리하는 기본 시스템을 제공한다:
1
2
3
4
5
6
7
8
| # 사용자 생성
rabbitmqctl add_user myuser mypassword
# 태그 설정 (관리자, 모니터링 등)
rabbitmqctl set_user_tags myuser administrator
# 권한 설정 (vhost, configure, write, read)
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
|
권한 패턴은 정규식을 사용하여 리소스에 대한 액세스를 제어한다:
- Configure: 리소스 생성 및 삭제
- Write: 메시지 발행
- Read: 메시지 소비
SSL/TLS 구성#
보안 통신을 위해 SSL/TLS를 구성할 수 있다:
1
2
3
4
5
6
7
8
| # rabbitmq.conf
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
listeners.ssl.default = 5671
|
클라이언트 측 설정:
1
2
3
4
| ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol("TLSv1.2");
|
LDAP 통합#
LDAP 플러그인을 사용하여 기존 디렉토리 서비스와 통합할 수 있다:
1
2
| # LDAP 플러그인 활성화
rabbitmq-plugins enable rabbitmq_auth_backend_ldap
|
구성 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # rabbitmq.conf
auth_backends.1 = ldap
# LDAP 서버 설정
auth_ldap.servers.1 = ldap.example.com
auth_ldap.port = 389
auth_ldap.user_dn_pattern = cn=${username},ou=People,dc=example,dc=com
# 가상 호스트 액세스 제어
auth_ldap.vhost_access_query = {in_group, "cn=rabbitmq,ou=Groups,dc=example,dc=com"}
# 태그 쿼리
auth_ldap.tag_queries.administrator = {in_group, "cn=rabbitmq-admin,ou=Groups,dc=example,dc=com"}
|
실무 적용 전략과 패턴#
큐 설계 패턴#
작업 큐 패턴#
시간이 오래 걸리는 작업을 비동기적으로 처리한다:
1
2
3
| 클라이언트 -> 작업 큐 -> 워커 1
-> 워커 2
-> 워커 3
|
구현 시 고려사항:
- 작업 큐는 내구성이 있어야 함(durable)
- 메시지는 지속적이어야 함(persistent)
- 공정한 작업 분배를 위해 prefetch 설정
- 명시적인 확인(ack)으로 안전하게 처리
발행/구독 패턴#
이벤트를 여러 소비자에게 브로드캐스트한다:
1
2
3
| 발행자 -> Fanout Exchange -> 큐 1 -> 소비자 1
-> 큐 2 -> 소비자 2
-> 큐 3 -> 소비자 3
|
사용 사례:
요청/응답 패턴#
RPC 스타일 통신을 구현한다:
1
2
| 클라이언트 -> 요청 큐 -> 서버
<- 응답 큐 <-
|
구현:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // 클라이언트 측
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
// 서버 측
String response = // 요청 처리
channel.basicPublish("", properties.getReplyTo(),
new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build(),
response.getBytes());
|
경쟁 소비자 패턴#
여러 소비자가 동일한 큐에서 메시지를 경쟁적으로 소비한다:
1
2
3
| 생산자 -> 큐 -> 소비자 1
-> 소비자 2
-> 소비자 3
|
이 패턴은 부하 분산에 유용하다. 각 메시지는 하나의 소비자에게만 전달된다.
메시지 및 라우팅 전략#
콘텐츠 기반 라우팅#
메시지 내용에 따라 다른 큐로 라우팅한다:
1
2
3
| 생산자 -> Topic Exchange -> Routing Pattern "*.error" -> 오류 처리 큐
-> Routing Pattern "*.warning" -> 경고 처리 큐
-> Routing Pattern "*.info" -> 정보 처리 큐
|
계층적 토픽#
계층적 구조를 사용하여 메시지를 조직한다:
1
| region.service.severity
|
예:
us-east.orders.error
: 미국 동부 지역의 주문 서비스 오류eu-west.payments.info
: EU 서부 지역의 결제 서비스 정보
시간 지연 메시지#
scheduled-messages 플러그인 또는 TTL과 데드 레터 교환기를 사용하여 지연된 처리를 구현한다:
1
2
3
4
5
6
7
8
9
10
| # 지연 교환기 선언 (플러그인 필요)
channel.exchangeDeclare("delayed", "x-delayed-message", true, false,
Map.of("x-delayed-type", "direct"));
# 지연 메시지 전송
channel.basicPublish("delayed", routingKey,
new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 5000)) // 5초 지연
.build(),
message.getBytes());
|
마이크로서비스 통합 패턴#
이벤트 기반 통신#
서비스 간 느슨한 결합을 위해 이벤트를 사용한다:
1
2
3
| 주문 서비스 -> "주문 생성" 이벤트 -> 재고 서비스
-> 결제 서비스
-> 배송 서비스
|
이 패턴은 서비스의 독립적인 확장과 변경을 가능하게 한다.
장애 격리#
Circuit Breaker 패턴을 RabbitMQ와 함께 사용하여 장애 전파를 방지한다:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // 메시지 처리 실패 시
if (!processMessage(message)) {
// 재시도 횟수 초과
if (getRetryCount(message) > MAX_RETRIES) {
// 데드 레터 큐로 전송
channel.basicPublish("dead-letter-exchange", "failed-messages",
null, message.getBytes());
} else {
// 재시도 큐로 전송
Map<String, Object> headers = new HashMap<>();
headers.put("x-retry-count", getRetryCount(message) + 1);
channel.basicPublish("retry-exchange", "retry.messages",
new AMQP.BasicProperties.Builder().headers(headers).build(),
message.getBytes());
}
}
|
CQRS(Command Query Responsibility Segregation)#
명령과 쿼리를 분리하는 CQRS 패턴에서 RabbitMQ는 명령 측과 쿼리 측 사이의 통신을 지원한다:
1
2
| 클라이언트 -> 명령 -> 명령 핸들러 -> "상태 변경" 이벤트 -> 이벤트 핸들러 -> 쿼리 데이터베이스 업데이트
클라이언트 <- 쿼리 <- 쿼리 핸들러 <----------------------
|
실전 운영 및 대규모 배포#
클러스터 크기 조정#
클러스터 크기를 결정하는 요소:
- 메시지 처리량
- 메시지 크기
- 큐 수
- 연결 및 채널 수
- 필요한 복제 수준
일반적인 경험적 규칙:
- 최소 3개 노드로 시작 (내결함성을 위해)
- 개별 노드가 50-70% CPU 사용률에 도달하면 확장
- 메모리 사용량이 시스템 메모리의 50%를 초과하지 않도록 유지
자동화 배포#
Ansible, Chef, Puppet과 같은 구성 관리 도구를 사용하여 RabbitMQ 배포를 자동화할 수 있다.
Ansible Playbook 예제:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| ---
- hosts: rabbitmq_nodes
tasks:
- name: Add RabbitMQ repository
apt_repository:
repo: deb https://dl.bintray.com/rabbitmq/debian {{ ansible_distribution_release }} main
state: present
- name: Install RabbitMQ
apt:
name: rabbitmq-server
state: present
- name: Enable management plugin
rabbitmq_plugin:
names: rabbitmq_management
state: enabled
- name: Configure cluster
# 클러스터 구성 단계
|
대규모 배포 사례 연구#
대용량 트래픽 처리
대량의 메시지를 처리하는 시스템의 접근 방식:
- 메시지 배치 처리
- 토픽 샤딩(여러 토픽으로 분할)
- 컨슈머 그룹을 사용한 병렬 처리
- 하드웨어 최적화(SSD, 충분한 메모리, 좋은 네트워크)
글로벌 분산 시스템
여러 지역에 분산된 RabbitMQ 클러스터 간 통신:
- federation 플러그인을 사용한 메시지 복제
- 지역적으로 가까운 클러스터에 우선 연결
- 샤딩 전략을 통한 지역별 데이터 분산
고가용성 구성
1.99% 이상의 가용성을 달성하기 위한 전략:
- 여러 가용성 영역에 걸친 최소 3노드 클러스터
- 적절한 모니터링 및 자동 복구 메커니즘
- 자동 장애 조치를 위한 로드 밸런서 구성
- 클라이언트 측의 재시도 및 장애 복구 전략
최신 동향 및 미래 방향#
스트림 큐#
RabbitMQ 3.9부터 도입된 스트림 큐는 Kafka와 유사한 로그 기반 메시징 모델을 제공한다:
1
2
3
4
| Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "stream");
args.put("x-max-length-bytes", 20_000_000_000L); // 20GB
channel.queueDeclare("my-stream", true, false, false, args);
|
스트림 큐의 장점:
- 더 높은 처리량
- 더 큰 백로그 지원
- 긴 기간 메시지 보존
서버리스 통합#
RabbitMQ를 서버리스 함수와 통합하는 패턴이 증가하고 있다:
- AWS Lambda + RabbitMQ
- Azure Functions + RabbitMQ
- Google Cloud Functions + RabbitMQ
컨테이너화 및 쿠버네티스#
쿠버네티스에서 RabbitMQ를 실행하기 위한 최신 접근 방식:
- RabbitMQ 쿠버네티스 오퍼레이터 사용
- 스테이트풀셋(StatefulSet)으로 배포
- 볼륨 클레임 템플릿(PersistentVolumeClaim)으로 데이터 영속성 확보
- 커스텀 리소스 정의(CRD)를 통한 선언적 관리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: production-rabbitmq
spec:
replicas: 3
resources:
requests:
cpu: 1
memory: 2Gi
limits:
cpu: 2
memory: 4Gi
persistence:
storageClassName: fast
storage: 20Gi
|
이벤트 메시 및 클라우드 네이티브 메시징#
분산 시스템에서 이벤트 메시로 RabbitMQ를 사용하는 추세가 증가하고 있다:
- 서비스 간 비동기 통신
- 이벤트 기반 아키텍처 지원
- 도메인 주도 설계(DDD)와의 통합
- 클라우드 네이티브 애플리케이션의 백본
용어 정리#
참고 및 출처#
공식 문서 및 튜토리얼#
실습 및 단계별 가이드#
참고 도서 및 자료#