RabbitMQ

RabbitMQ는 Erlang 언어로 작성된 오픈 소스 메시지 브로커 시스템으로, AMQP(Advanced Message Queuing Protocol)를 구현하고 있다. 2007년에 처음 출시되었으며, 현재는 VMware의 자회사인 Pivotal Software에서 관리하고 있다. RabbitMQ는 안정성, 확장성, 다양한 메시징 패턴 지원 등으로 인해 많은 기업들이 메시지 기반 아키텍처의 핵심 컴포넌트로 채택하고 있다.

주요 특징

RabbitMQ의 주요 사용 사례

RabbitMQ의 핵심 개념

  1. AMQP 프로토콜
    AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어를 위한 개방형 표준 프로토콜이다. AMQP의 주요 개념을 이해하는 것은 RabbitMQ를 효과적으로 사용하기 위한 기본이다.

  2. RabbitMQ 아키텍처
    RabbitMQ의 아키텍처는 다음 구성 요소로 이루어져 있다:

    1. 프로듀서(Producer)
      메시지를 생성하여 RabbitMQ로 전송하는 애플리케이션이다. 프로듀서는 메시지를 생성하고 Exchange에 발행한다.
    2. 컨슈머(Consumer)
      RabbitMQ로부터 메시지를 수신하고 처리하는 애플리케이션이다. 컨슈머는 큐에서 메시지를 구독하여 처리한다.
    3. Exchange
      프로듀서로부터 받은 메시지를 큐로 라우팅하는 라우터 역할을 한다. Exchange 타입에 따라 메시지 라우팅 방식이 달라진다.
      Exchange 타입:
    • Direct Exchange: 라우팅 키가 정확히 일치하는 큐에 메시지를 전달한다.
    • Fanout Exchange: 바인딩된 모든 큐에 메시지를 브로드캐스트한다.
    • Topic Exchange: 라우팅 키 패턴이 일치하는 큐에 메시지를 전달한다.
    • Headers Exchange: 메시지 헤더 속성을 기반으로 라우팅한다.
    1. 큐(Queue)
      메시지가 저장되는 버퍼이다. 컨슈머는 큐에서 메시지를 가져와 처리한다. 큐는 FIFO(First In, First Out) 방식으로 작동한다.
    2. 바인딩(Binding)
      Exchange와 큐 사이의 관계를 정의한다. 바인딩은 라우팅 키(Routing Key)를 사용하여 Exchange가 메시지를 어떤 큐로 전달할지 결정하는 규칙을 설정한다.
    3. 가상 호스트(Virtual Host)
      리소스(Exchange, 큐 등)를 논리적으로 그룹화하는 네임스페이스이다. 가상 호스트는 자체 사용자 권한을 가지며, 서로 다른 애플리케이션이 같은 RabbitMQ 서버를 공유할 수 있게 한다.
  3. 메시징 패턴
    RabbitMQ에서 구현할 수 있는 주요 메시징 패턴:

    1. 작업 큐(Work Queue)
      시간이 오래 걸리는 작업을 여러 워커에게 분산시키는 패턴이다. 라운드 로빈 방식으로 작업을 분배한다.

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      
      ## Python 작업 큐 예제 (생산자)
      
      import pika
      
      connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
      channel = connection.channel()
      
      channel.queue_declare(queue='task_queue', durable=True)
      
      message = "복잡한 작업 내용…"
      channel.basic_publish(
      exchange='',
      routing_key='task_queue',
      body=message,
      properties=pika.BasicProperties(delivery_mode=2)  # 메시지 지속성 설정
      )
      
      print(f" [x] {message} 전송됨")
      connection.close()
      
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      
      # Python 작업 큐 예제 (소비자)
      import pika, time
      
      connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
      channel = connection.channel()
      
      channel.queue_declare(queue='task_queue', durable=True)
      print(' [*] 메시지를 기다리는 중. 종료하려면 CTRL+C를 누르세요')
      
      def callback(ch, method, properties, body):
       print(f" [x] {body.decode()} 수신됨")
       time.sleep(body.count(b'.'))  # 메시지 내 점의 개수만큼 대기
       print(" [x] 완료")
       ch.basic_ack(delivery_tag=method.delivery_tag)
      
      channel.basic_qos(prefetch_count=1)  # 워커에게 한 번에 하나의 메시지만 할당
      channel.basic_consume(queue='task_queue', on_message_callback=callback)
      
      channel.start_consuming()
      
      1. 구독(Publish/Subscribe)
        하나의 메시지를 여러 소비자에게 브로드캐스트하는 패턴이다. Fanout Exchange를 사용한다.

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        
        # Python 발행/구독 예제 (발행자)
        import pika
        
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        
        channel.exchange_declare(exchange='logs', exchange_type='fanout')
        
        message = "정보: 이것은 로그 메시지입니다."
        channel.basic_publish(
            exchange='logs',
            routing_key='',
            body=message
        )
        
        print(f" [x] {message} 전송됨")
        connection.close()
        
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        
        # Python 발행/구독 예제 (구독자)
        import pika
        
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
        
        channel.exchange_declare(exchange='logs', exchange_type='fanout')
        
        result = channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        
        channel.queue_bind(exchange='logs', queue=queue_name)
        
        print(' [*] 로그를 기다리는 중. 종료하려면 CTRL+C를 누르세요')
        
        def callback(ch, method, properties, body):
            print(f" [x] {body.decode()}")
        
        channel.basic_consume(
            queue=queue_name, on_message_callback=callback, auto_ack=True)
        
        channel.start_consuming()
        
    2. 라우팅(Routing)
      특정 기준에 따라 메시지를 선택적으로 수신하는 패턴이다. Direct Exchange를 사용한다.

    3. 토픽(Topic)
      여러 기준에 따라 메시지를 패턴 매칭으로 라우팅하는 패턴이다. Topic Exchange를 사용한다.

    4. RPC(Remote Procedure Call)
      클라이언트가 요청을 보내고 응답을 기다리는 동기적인 패턴이다.

RabbitMQ 설치 및 기본 구성

단독 서버 설치

Linux에 설치
1
2
3
4
5
6
7
8
# Debian/Ubuntu
sudo apt-get install rabbitmq-server

# 서비스 시작
sudo systemctl start rabbitmq-server

# 부팅 시 자동 시작
sudo systemctl enable rabbitmq-server
Docker를 사용한 설치
1
2
# RabbitMQ 도커 이미지 실행
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

관리 인터페이스 활성화

1
2
3
4
5
6
7
8
9
# 관리 플러그인 활성화
sudo rabbitmq-plugins enable rabbitmq_management

# 사용자 추가
sudo rabbitmqctl add_user admin strongpassword

# 관리자 권한 부여
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

관리 인터페이스는 http://localhost:15672에서> 접근할 수 있으며, 기본 사용자 이름과 비밀번호는 guest/guest이다(로컬호스트에서만 작동).

클러스터 설정

기본 클러스터 설정

클러스터는 최소 3개의 노드로 구성하는 것이 좋다. 각 노드에서 RabbitMQ를 설치한 후 다음 단계를 따른다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 노드 1에서 Erlang 쿠키 확인
cat /var/lib/rabbitmq/.erlang.cookie

# 모든 노드에서 동일한 Erlang 쿠키 설정
# 노드 2, 3에서:
sudo service rabbitmq-server stop
echo "ERLANG_COOKIE_FROM_NODE_1" | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
sudo service rabbitmq-server start

# 노드 2에서 노드 1에 조인
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@node1
sudo rabbitmqctl start_app

# 노드 3에서도 동일하게 수행
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@node1
sudo rabbitmqctl start_app

# 클러스터 상태 확인
sudo rabbitmqctl cluster_status

고가용성 설정

미러링 큐를 통해 고가용성을 확보할 수 있다:

1
2
# 모든 큐를 자동으로 미러링하는 정책 설정
sudo rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

주요 구성 파일

RabbitMQ의 주요 구성 파일은 다음과 같다:

중요한 구성 매개변수:

1
2
3
4
5
# rabbitmq.conf 예제
listeners.tcp.default = 5672
management.tcp.port = 15672
vm_memory_high_watermark.relative = 0.4
disk_free_limit.absolute = 1GB

RabbitMQ 클라이언트 프로그래밍

Java 클라이언트

의존성 추가 (Maven):

1
2
3
4
5
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

기본 생산자 코드:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

기본 소비자 코드:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

Spring AMQP 사용

Spring Boot 애플리케이션에서는 Spring AMQP를 통해 더 쉽게 RabbitMQ를 사용할 수 있다.

의존성 추가 (Gradle):

1
implementation 'org.springframework.boot:spring-boot-starter-amqp'

RabbitMQ 구성:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
    
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("direct-exchange");
    }
    
    @Bean
    public Binding binding(Queue helloQueue, DirectExchange exchange) {
        return BindingBuilder.bind(helloQueue).to(exchange).with("hello");
    }
}

메시지 생산자:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQSender {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private Queue queue;
    
    public void send(String message) {
        rabbitTemplate.convertAndSend("direct-exchange", "hello", message);
        System.out.println("메시지 전송됨: " + message);
    }
}

메시지 소비자:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQReceiver {
    
    @RabbitListener(queues = "hello")
    public void receiveMessage(String message) {
        System.out.println("메시지 수신됨: " + message);
        // 메시지 처리 로직
    }
}

Node.js 클라이언트

Amqplib 설치:

1
npm install amqplib

기본 생산자 코드:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (error0, connection) => {
    if (error0) {
        throw error0;
    }
    
    connection.createChannel((error1, channel) => {
        if (error1) {
            throw error1;
        }
        
        const queue = 'hello';
        const msg = 'Hello World!';
        
        channel.assertQueue(queue, {
            durable: false
        });
        
        channel.sendToQueue(queue, Buffer.from(msg));
        console.log(" [x] Sent %s", msg);
        
        setTimeout(() => {
            connection.close();
            process.exit(0);
        }, 500);
    });
});

기본 소비자 코드:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (error0, connection) => {
    if (error0) {
        throw error0;
    }
    
    connection.createChannel((error1, channel) => {
        if (error1) {
            throw error1;
        }
        
        const queue = 'hello';
        
        channel.assertQueue(queue, {
            durable: false
        });
        
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        
        channel.consume(queue, (msg) => {
            console.log(" [x] Received %s", msg.content.toString());
        }, {
            noAck: true
        });
    });
});

고급 RabbitMQ 기능

메시지 내구성

메시지가 유실되지 않도록 내구성을 보장하는 방법:

  1. 지속적인 큐 선언:

    1
    
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);  // durable=true
    
  2. 지속적인 메시지 발행:

    1
    2
    3
    
    channel.basicPublish("", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    
  3. 메시지 확인(Acknowledge):

    1
    2
    3
    
    channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    // 처리 후 명시적으로 확인
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    

메시지 제어

프리페치 카운트(Prefetch Count)

한 번에 처리할 수 있는 메시지 수를 제한한다:

1
channel.basicQos(1);  // 한 번에 하나의 메시지만 처리
메시지 TTL(Time to Live)

메시지의 유효 기간을 설정한다:

1
2
3
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);  // 60초 후 만료
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
데드 레터 익스체인지(Dead Letter Exchange)

처리할 수 없는 메시지를 특별한 큐로 보낸다:

1
2
3
4
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead-letter-exchange");
args.put("x-dead-letter-routing-key", "dead-letter");
channel.queueDeclare(QUEUE_NAME, true, false, false, args);

교환기 유형과 바인딩 전략

Direct Exchange

정확한 라우팅 키 일치에 기반한 라우팅:

1
2
channel.exchangeDeclare("direct_logs", "direct");
channel.queueBind(queueName, "direct_logs", "error");  // error 메시지만 수신
Topic Exchange

패턴 매칭을 사용한 라우팅:

1
2
channel.exchangeDeclare("topic_logs", "topic");
channel.queueBind(queueName, "topic_logs", "kern.*");  // kern으로 시작하는 모든 메시지

패턴에서:

Headers Exchange

메시지 헤더에 기반한 라우팅:

1
2
3
4
5
6
7
channel.exchangeDeclare("header_exchange", "headers");

Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "all");  // all: 모든 헤더 일치, any: 하나라도 일치
bindingArgs.put("format", "pdf");
bindingArgs.put("type", "report");
channel.queueBind(queueName, "header_exchange", "", bindingArgs);

플러그인을 통한 기능 확장

RabbitMQ는 플러그인을 통해 기능을 확장할 수 있다:

1
2
3
4
5
6
# 플러그인 목록 보기
rabbitmq-plugins list

# 플러그인 활성화
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

유용한 플러그인:

최적화 및 모니터링

6.1 성능 튜닝

6.1.1 메모리 관리

RabbitMQ는 기본적으로 사용 가능한 시스템 메모리의 40%를 사용하도록 설정되어 있습니다. 이 값은 조정 가능합니다:

1
2
# rabbitmq.conf
vm_memory_high_watermark.relative = 0.6  # 메모리의 60%까지 사용

또는 절대값으로 설정:

1
vm_memory_high_watermark.absolute = 2GB

6.1.2 디스크 공간 관리

RabbitMQ는 디스크 공간이 부족하면 메시지 수락을 중지합니다:

1
2
# rabbitmq.conf
disk_free_limit.absolute = 5GB

6.1.3 큐 최적화

큐 타입과 큐 인자를 사용하여 성능을 최적화할 수 있습니다:

1
2
3
4
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");  // 쿼럼 큐 사용 (3.8 이상)
args.put("x-max-length", 10000);     // 큐 길이 제한
channel.queueDeclare(QUEUE_NAME, true, false, false, args);

6.1.4 채널 및 연결 관리

6.1.5 배치 처리

성능을 위해 메시지를 배치로 발행하고 소비합니다:

1
2
3
4
5
6
7
// 배치 발행
for (int i = 0; i < batchSize; i++) {
    channel.basicPublish("", QUEUE_NAME, null, messages[i].getBytes());
}

// 배치 소비
channel.basicQos(100);  // 한 번에 100개 메시지 처리

6.2 모니터링

6.2.1 관리 UI 사용

웹 관리 인터페이스(http://localhost:15672)에서> 다양한 메트릭을 모니터링할 수 있습니다:

6.2.2 HTTP API 사용

HTTP API를 통해 프로그래밍 방식으로 모니터링할 수 있습니다:

1
2
# 큐 상태 확인
curl -u guest:guest http://localhost:15672/api/queues/%2F/my_queue

6.2.3 Prometheus 및 Grafana 통합

RabbitMQ Prometheus 플러그인을 사용하여 더 강력한 모니터링을 구성할 수 있습니다:

1
2
# Prometheus 플러그인 활성화
rabbitmq-plugins enable rabbitmq_prometheus

Prometheus 설정:

1
2
3
4
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['rabbitmq:15692']

그런 다음 Grafana에서 RabbitMQ 대시보드를 가져와 사용할 수 있습니다.

6.3 성능 테스트

PerfTest 도구를 사용하여 RabbitMQ 성능을 테스트할 수 있습니다:

1
2
# 성능 테스트 실행
./runjava com.rabbitmq.perf.PerfTest -h amqp://localhost -x 1 -y 1 -u "throughput-test" -a --id "test 1"

중요한 성능 지표:

RabbitMQ의 장애 처리 및 고가용성

장애 시나리오 및 처리

브로커 장애

브로커 노드가 실패할 경우 클러스터의 다른 노드가 작업을 인계받는다. 미러링된 큐를 사용하면 데이터 손실을 방지할 수 있다. 미러링 큐는 모든 메시지의 복사본을 여러 노드에 유지하여 노드 장애 시에도 메시지를 사용할 수 있게 한다.

RabbitMQ 3.8 이후 버전에서는 쿼럼 큐(Quorum Queues)라는 새로운 고가용성 큐 타입을 제공한다:

1
2
3
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("ha-queue", true, false, false, args);

쿼럼 큐는 Raft 합의 프로토콜을 사용하여 더 강력한 일관성과 내결함성을 제공한다.

네트워크 분할

네트워크 분할(Network Partition) 또는 “브레인 스플릿(Brain Split)“이 발생하면 RabbitMQ 클러스터가 여러 부분으로 나뉘어 각각 독립적으로 작동하게 된다. 이 문제를 해결하기 위한 정책을 구성할 수 있다:

1
2
# rabbitmq.conf
cluster_partition_handling = autoheal  # 자동 복구 모드

가능한 설정:

디스크 공간 부족

디스크 공간이 부족하면 RabbitMQ는 새 메시지의 수락을 중지한다.

이를 위한 조치:

1
2
# 디스크 경보 임계값 조정
rabbitmqctl set_disk_free_limit 5GB
메모리 경보

메모리 사용량이 높으면 RabbitMQ는 생산자의 속도를 제한한다:

1
2
# 메모리 경보 임계값 조정
rabbitmqctl set_vm_memory_high_watermark 0.6

고가용성 구성

클러스터링

기본 RabbitMQ 클러스터는 메타데이터를 복제하지만 큐의 내용은 복제하지 않는다. 높은 가용성을 위해서는 미러링 큐나 쿼럼 큐를 사용해야 한다.

미러링 큐 정책

관리 UI 또는 CLI를 통해 미러링 정책을 설정할 수 있다:

1
2
3
4
5
6
7
8
# 모든 큐를 모든 노드에 미러링
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}' --apply-to queues

# 모든 큐를 특정 수의 노드에 미러링
rabbitmqctl set_policy ha-two ".*" '{"ha-mode":"exactly","ha-params":2}' --apply-to queues

# 대기열 이름이 "ha."로 시작하는 큐만 미러링
rabbitmqctl set_policy ha-match "^ha\." '{"ha-mode":"all"}' --apply-to queues
쿼럼 큐

쿼럼 큐는 Raft 합의 알고리즘을 사용하여 메시지의 일관성을 보장한다:

1
2
3
4
5
6
7
# 쿼럼 큐 선언 (클라이언트에서)
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("critical-queue", true, false, false, args);

# 또는 정책을 통해 설정
rabbitmqctl set_policy quorum-queues "^critical\." '{"x-queue-type":"quorum"}' --apply-to queues

쿼럼 큐의 특징:

가용성 영역 간 배포

클라우드 환경에서는 여러 가용성 영역(AZ)에 걸쳐 RabbitMQ 노드를 배포하여 전체 AZ 장애에도 서비스를 유지할 수 있다:

1
2
3
노드 1: AZ-1
노드 2: AZ-2
노드 3: AZ-3

이 설정은 AZ 간 네트워크 지연을 고려해야 한다.

페더레이션 플러그인

federation 플러그인을 사용하면 지역적으로 분산된 브로커 간에 메시지를 전달할 수 있다:

1
2
3
4
5
6
7
8
9
# 페더레이션 플러그인 활성화
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 업스트림 설정
rabbitmqctl set_parameter federation-upstream my-upstream '{"uri":"amqp://remote-host"}'

# 정책 설정
rabbitmqctl set_policy federate-me "^federated\." '{"federation-upstream-set":"all"}' --apply-to exchanges

백업 및 복구 전략

정의 백업

RabbitMQ의 토폴로지 정의(exchanges, queues, bindings, policies)를 백업하는 것이 중요하다:

1
2
3
4
5
# 정의 내보내기
rabbitmqctl export_definitions /path/to/definitions.json

# 정의 가져오기
rabbitmqctl import_definitions /path/to/definitions.json
메시지 백업

메시지 백업을 위한 몇 가지 전략:

보안 및 인증

사용자 관리 및 권한

RabbitMQ는 사용자 계정과 권한을 관리하는 기본 시스템을 제공한다:

1
2
3
4
5
6
7
8
# 사용자 생성
rabbitmqctl add_user myuser mypassword

# 태그 설정 (관리자, 모니터링 등)
rabbitmqctl set_user_tags myuser administrator

# 권한 설정 (vhost, configure, write, read)
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

권한 패턴은 정규식을 사용하여 리소스에 대한 액세스를 제어한다:

SSL/TLS 구성

보안 통신을 위해 SSL/TLS를 구성할 수 있다:

1
2
3
4
5
6
7
8
# rabbitmq.conf
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

listeners.ssl.default = 5671

클라이언트 측 설정:

1
2
3
4
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol("TLSv1.2");

LDAP 통합

LDAP 플러그인을 사용하여 기존 디렉토리 서비스와 통합할 수 있다:

1
2
# LDAP 플러그인 활성화
rabbitmq-plugins enable rabbitmq_auth_backend_ldap

구성 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# rabbitmq.conf
auth_backends.1 = ldap

# LDAP 서버 설정
auth_ldap.servers.1 = ldap.example.com
auth_ldap.port = 389
auth_ldap.user_dn_pattern = cn=${username},ou=People,dc=example,dc=com

# 가상 호스트 액세스 제어
auth_ldap.vhost_access_query = {in_group, "cn=rabbitmq,ou=Groups,dc=example,dc=com"}

# 태그 쿼리
auth_ldap.tag_queries.administrator = {in_group, "cn=rabbitmq-admin,ou=Groups,dc=example,dc=com"}

실무 적용 전략과 패턴

큐 설계 패턴

작업 큐 패턴

시간이 오래 걸리는 작업을 비동기적으로 처리한다:

1
2
3
클라이언트 -> 작업 큐 -> 워커 1
                     -> 워커 2
                     -> 워커 3

구현 시 고려사항:

발행/구독 패턴

이벤트를 여러 소비자에게 브로드캐스트한다:

1
2
3
발행자 -> Fanout Exchange -> 큐 1 -> 소비자 1
                          -> 큐 2 -> 소비자 2
                          -> 큐 3 -> 소비자 3

사용 사례:

요청/응답 패턴

RPC 스타일 통신을 구현한다:

1
2
클라이언트 -> 요청 큐 -> 서버
         <- 응답 큐 <-

구현:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 클라이언트 측
final String corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties
        .Builder()
        .correlationId(corrId)
        .replyTo(replyQueueName)
        .build();

channel.basicPublish("", requestQueueName, props, message.getBytes());

// 서버 측
String response = // 요청 처리
channel.basicPublish("", properties.getReplyTo(), 
    new AMQP.BasicProperties.Builder()
        .correlationId(properties.getCorrelationId())
        .build(),
    response.getBytes());
경쟁 소비자 패턴

여러 소비자가 동일한 큐에서 메시지를 경쟁적으로 소비한다:

1
2
3
생산자 -> 큐 -> 소비자 1
             -> 소비자 2
             -> 소비자 3

이 패턴은 부하 분산에 유용하다. 각 메시지는 하나의 소비자에게만 전달된다.

메시지 및 라우팅 전략

콘텐츠 기반 라우팅

메시지 내용에 따라 다른 큐로 라우팅한다:

1
2
3
생산자 -> Topic Exchange -> Routing Pattern "*.error" -> 오류 처리 큐
                         -> Routing Pattern "*.warning" -> 경고 처리 큐
                         -> Routing Pattern "*.info" -> 정보 처리 큐
계층적 토픽

계층적 구조를 사용하여 메시지를 조직한다:

1
region.service.severity

예:

시간 지연 메시지

scheduled-messages 플러그인 또는 TTL과 데드 레터 교환기를 사용하여 지연된 처리를 구현한다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 지연 교환기 선언 (플러그인 필요)
channel.exchangeDeclare("delayed", "x-delayed-message", true, false, 
    Map.of("x-delayed-type", "direct"));

# 지연 메시지 전송
channel.basicPublish("delayed", routingKey,
    new AMQP.BasicProperties.Builder()
        .headers(Map.of("x-delay", 5000))  // 5초 지연
        .build(),
    message.getBytes());

마이크로서비스 통합 패턴

이벤트 기반 통신

서비스 간 느슨한 결합을 위해 이벤트를 사용한다:

1
2
3
주문 서비스 -> "주문 생성" 이벤트 -> 재고 서비스
                                  -> 결제 서비스
                                  -> 배송 서비스

이 패턴은 서비스의 독립적인 확장과 변경을 가능하게 한다.

장애 격리

Circuit Breaker 패턴을 RabbitMQ와 함께 사용하여 장애 전파를 방지한다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 메시지 처리 실패 시
if (!processMessage(message)) {
    // 재시도 횟수 초과
    if (getRetryCount(message) > MAX_RETRIES) {
        // 데드 레터 큐로 전송
        channel.basicPublish("dead-letter-exchange", "failed-messages", 
            null, message.getBytes());
    } else {
        // 재시도 큐로 전송
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-retry-count", getRetryCount(message) + 1);
        channel.basicPublish("retry-exchange", "retry.messages", 
            new AMQP.BasicProperties.Builder().headers(headers).build(), 
            message.getBytes());
    }
}
CQRS(Command Query Responsibility Segregation)

명령과 쿼리를 분리하는 CQRS 패턴에서 RabbitMQ는 명령 측과 쿼리 측 사이의 통신을 지원한다:

1
2
클라이언트 -> 명령 -> 명령 핸들러 -> "상태 변경" 이벤트 -> 이벤트 핸들러 -> 쿼리 데이터베이스 업데이트
클라이언트 <- 쿼리 <- 쿼리 핸들러 <----------------------

실전 운영 및 대규모 배포

클러스터 크기 조정

클러스터 크기를 결정하는 요소:

일반적인 경험적 규칙:

자동화 배포

Ansible, Chef, Puppet과 같은 구성 관리 도구를 사용하여 RabbitMQ 배포를 자동화할 수 있다.

Ansible Playbook 예제:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
---
- hosts: rabbitmq_nodes
  tasks:
    - name: Add RabbitMQ repository
      apt_repository:
        repo: deb https://dl.bintray.com/rabbitmq/debian {{ ansible_distribution_release }} main
        state: present
        
    - name: Install RabbitMQ
      apt:
        name: rabbitmq-server
        state: present
        
    - name: Enable management plugin
      rabbitmq_plugin:
        names: rabbitmq_management
        state: enabled
        
    - name: Configure cluster
      # 클러스터 구성 단계

대규모 배포 사례 연구

  1. 대용량 트래픽 처리
    대량의 메시지를 처리하는 시스템의 접근 방식:

    • 메시지 배치 처리
    • 토픽 샤딩(여러 토픽으로 분할)
    • 컨슈머 그룹을 사용한 병렬 처리
    • 하드웨어 최적화(SSD, 충분한 메모리, 좋은 네트워크)
  2. 글로벌 분산 시스템
    여러 지역에 분산된 RabbitMQ 클러스터 간 통신:

    • federation 플러그인을 사용한 메시지 복제
    • 지역적으로 가까운 클러스터에 우선 연결
    • 샤딩 전략을 통한 지역별 데이터 분산
  3. 고가용성 구성
    1.99% 이상의 가용성을 달성하기 위한 전략:

    • 여러 가용성 영역에 걸친 최소 3노드 클러스터
    • 적절한 모니터링 및 자동 복구 메커니즘
    • 자동 장애 조치를 위한 로드 밸런서 구성
    • 클라이언트 측의 재시도 및 장애 복구 전략

최신 동향 및 미래 방향

스트림 큐

RabbitMQ 3.9부터 도입된 스트림 큐는 Kafka와 유사한 로그 기반 메시징 모델을 제공한다:

1
2
3
4
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "stream");
args.put("x-max-length-bytes", 20_000_000_000L);  // 20GB
channel.queueDeclare("my-stream", true, false, false, args);

스트림 큐의 장점:

서버리스 통합

RabbitMQ를 서버리스 함수와 통합하는 패턴이 증가하고 있다:

컨테이너화 및 쿠버네티스

쿠버네티스에서 RabbitMQ를 실행하기 위한 최신 접근 방식:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: production-rabbitmq
spec:
  replicas: 3
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 2
      memory: 4Gi
  persistence:
    storageClassName: fast
    storage: 20Gi

이벤트 메시 및 클라우드 네이티브 메시징

분산 시스템에서 이벤트 메시로 RabbitMQ를 사용하는 추세가 증가하고 있다:


용어 정리

용어설명

참고 및 출처

공식 문서 및 튜토리얼

실습 및 단계별 가이드

참고 도서 및 자료