Competing Consumers

Competing Consumers 패턴은 여러 소비자(Consumer)가 동일한 메시지 채널에서 동시에 메시지를 처리하는 방식이다. 이 패턴을 통해 시스템의 처리량을 높이고 확장성과 가용성을 개선할 수 있다.

Competing Consumers 패턴은 MSA 환경에서 메시지 처리의 확장성과 효율성을 크게 향상시킬 수 있다. 하지만 메시지 순서와 같은 특정 요구사항이 있는 경우에는 신중하게 적용해야 한다.

작동 원리

  1. 메시지 생성: 애플리케이션이 메시지 큐에 메시지를 게시한다.
  2. 메시지 소비: 여러 소비자 인스턴스가 동일한 큐에서 메시지를 가져와 처리한다.
  3. 경쟁: 각 소비자는 다음 메시지를 처리하기 위해 경쟁한다.

동작 방식

  1. 메시지 생산자(Producer): 애플리케이션은 처리해야 할 작업을 메시지 형태로 메시지 큐에 게시한다.
  2. 메시지 큐(Message Queue): 게시된 메시지는 큐에 저장되어 대기한다.
  3. 메시지 소비자(Consumer): 여러 개의 소비자 인스턴스가 동일한 메시지 큐에서 메시지를 수신한다. 각 메시지는 한 번에 하나의 소비자에게만 전달되며, 이를 통해 작업이 병렬로 처리된다.

이러한 구조를 통해 시스템은 작업 부하를 여러 소비자 인스턴스에 분산시켜 병목 현상을 방지하고, 동시 처리 능력을 향상시킨다.

주요 특징

  1. 병렬 처리: 여러 소비자가 동시에 메시지를 처리하여 전체 처리량을 높인다.
  2. 자동 부하 분산: 메시지 브로커가 소비자 간에 메시지를 자동으로 분배한다.
  3. 동적 확장성: 필요에 따라 소비자를 추가하거나 제거하여 처리 능력을 조절할 수 있다.

장점

  1. 처리량 향상: 여러 소비자가 동시에 작업하여 전체 처리 속도가 빨라진다.
  2. 확장성: 소비자를 추가하여 시스템의 처리 능력을 쉽게 확장할 수 있다.
  3. 고가용성: 일부 소비자에 문제가 생겨도 다른 소비자가 계속 작업을 처리할 수 있다.
  4. 유연성: 트래픽 변동에 따라 소비자 수를 조절할 수 있다.

주의사항

  1. 메시지 순서: 여러 소비자가 동시에 처리하므로 메시지 처리 순서를 보장하기 어려울 수 있다.
  2. 병목 현상 이동: 소비자 수를 늘리면 다운스트림 리소스(예: 데이터베이스)에 부하가 집중될 수 있다.
  3. 멱등성: 동일한 메시지가 여러 번 처리되어도 문제가 없도록 소비자 로직을 설계해야 한다.

적용 시나리오

  1. 대량의 이미지 또는 비디오 처리
  2. 대규모 데이터 분석 작업
  3. 이메일 발송 시스템
  4. 주문 처리 시스템

구현 예시

Spring Boot와 RabbitMQ를 사용

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Configuration
public class RabbitConfig {
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("myQueue");
        container.setMessageListener(message -> {
            // 메시지 처리 로직
        });
        container.setConcurrentConsumers(3); // 동시 소비자 수 설정
        return container;
    }
}

이 설정은 ‘myQueue’라는 큐에서 메시지를 소비하는 3개의 동시 소비자를 생성한다.

Node.js와 RabbitMQ를 사용

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 메시지 소비자 구현
class MessageConsumer {
    constructor(options = {}) {
        this.queueName = options.queueName || 'task_queue';
        this.consumerId = crypto.randomUUID();
        this.connection = null;
        this.channel = null;
        this.prefetchCount = options.prefetchCount || 1;
    }

    async connect() {
        try {
            // RabbitMQ 연결 설정
            this.connection = await amqp.connect('amqp://localhost');
            this.channel = await this.connection.createChannel();

            // 큐 설정
            await this.channel.assertQueue(this.queueName, {
                durable: true,  // 서버 재시작 시에도 큐 유지
                maxPriority: 10 // 우선순위 설정
            });

            // 공정한 작업 분배를 위한 prefetch 설정
            await this.channel.prefetch(this.prefetchCount);

            console.log(`Consumer ${this.consumerId} connected to message queue`);
        } catch (error) {
            console.error('Connection failed:', error);
            throw error;
        }
    }

    async consume() {
        try {
            await this.channel.consume(this.queueName, async (msg) => {
                if (msg === null) {
                    return;
                }

                const task = JSON.parse(msg.content.toString());
                console.log(`Consumer ${this.consumerId} processing task:`, task);

                try {
                    // 작업 처리
                    await this.processTask(task);
                    
                    // 작업 완료 확인
                    this.channel.ack(msg);
                } catch (error) {
                    // 작업 실패 시 재시도 큐로 이동
                    this.handleProcessingError(msg, error);
                }
            });
        } catch (error) {
            console.error('Consumption error:', error);
            throw error;
        }
    }
}

참고 및 출처