Competing Consumers#
Competing Consumers 패턴은 여러 소비자(Consumer)가 동일한 메시지 채널에서 동시에 메시지를 처리하는 방식이다. 이 패턴을 통해 시스템의 처리량을 높이고 확장성과 가용성을 개선할 수 있다.
Competing Consumers 패턴은 MSA 환경에서 메시지 처리의 확장성과 효율성을 크게 향상시킬 수 있다. 하지만 메시지 순서와 같은 특정 요구사항이 있는 경우에는 신중하게 적용해야 한다.
작동 원리#
- 메시지 생성: 애플리케이션이 메시지 큐에 메시지를 게시한다.
- 메시지 소비: 여러 소비자 인스턴스가 동일한 큐에서 메시지를 가져와 처리한다.
- 경쟁: 각 소비자는 다음 메시지를 처리하기 위해 경쟁한다.
동작 방식#
- 메시지 생산자(Producer): 애플리케이션은 처리해야 할 작업을 메시지 형태로 메시지 큐에 게시한다.
- 메시지 큐(Message Queue): 게시된 메시지는 큐에 저장되어 대기한다.
- 메시지 소비자(Consumer): 여러 개의 소비자 인스턴스가 동일한 메시지 큐에서 메시지를 수신한다. 각 메시지는 한 번에 하나의 소비자에게만 전달되며, 이를 통해 작업이 병렬로 처리된다.
이러한 구조를 통해 시스템은 작업 부하를 여러 소비자 인스턴스에 분산시켜 병목 현상을 방지하고, 동시 처리 능력을 향상시킨다.
주요 특징#
- 병렬 처리: 여러 소비자가 동시에 메시지를 처리하여 전체 처리량을 높인다.
- 자동 부하 분산: 메시지 브로커가 소비자 간에 메시지를 자동으로 분배한다.
- 동적 확장성: 필요에 따라 소비자를 추가하거나 제거하여 처리 능력을 조절할 수 있다.
- 처리량 향상: 여러 소비자가 동시에 작업하여 전체 처리 속도가 빨라진다.
- 확장성: 소비자를 추가하여 시스템의 처리 능력을 쉽게 확장할 수 있다.
- 고가용성: 일부 소비자에 문제가 생겨도 다른 소비자가 계속 작업을 처리할 수 있다.
- 유연성: 트래픽 변동에 따라 소비자 수를 조절할 수 있다.
주의사항#
- 메시지 순서: 여러 소비자가 동시에 처리하므로 메시지 처리 순서를 보장하기 어려울 수 있다.
- 병목 현상 이동: 소비자 수를 늘리면 다운스트림 리소스(예: 데이터베이스)에 부하가 집중될 수 있다.
- 멱등성: 동일한 메시지가 여러 번 처리되어도 문제가 없도록 소비자 로직을 설계해야 한다.
적용 시나리오#
- 대량의 이미지 또는 비디오 처리
- 대규모 데이터 분석 작업
- 이메일 발송 시스템
- 주문 처리 시스템
구현 예시#
Spring Boot와 RabbitMQ를 사용#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| @Configuration
public class RabbitConfig {
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("myQueue");
container.setMessageListener(message -> {
// 메시지 처리 로직
});
container.setConcurrentConsumers(3); // 동시 소비자 수 설정
return container;
}
}
|
이 설정은 ‘myQueue’라는 큐에서 메시지를 소비하는 3개의 동시 소비자를 생성한다.
Node.js와 RabbitMQ를 사용#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| // 메시지 소비자 구현
class MessageConsumer {
constructor(options = {}) {
this.queueName = options.queueName || 'task_queue';
this.consumerId = crypto.randomUUID();
this.connection = null;
this.channel = null;
this.prefetchCount = options.prefetchCount || 1;
}
async connect() {
try {
// RabbitMQ 연결 설정
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// 큐 설정
await this.channel.assertQueue(this.queueName, {
durable: true, // 서버 재시작 시에도 큐 유지
maxPriority: 10 // 우선순위 설정
});
// 공정한 작업 분배를 위한 prefetch 설정
await this.channel.prefetch(this.prefetchCount);
console.log(`Consumer ${this.consumerId} connected to message queue`);
} catch (error) {
console.error('Connection failed:', error);
throw error;
}
}
async consume() {
try {
await this.channel.consume(this.queueName, async (msg) => {
if (msg === null) {
return;
}
const task = JSON.parse(msg.content.toString());
console.log(`Consumer ${this.consumerId} processing task:`, task);
try {
// 작업 처리
await this.processTask(task);
// 작업 완료 확인
this.channel.ack(msg);
} catch (error) {
// 작업 실패 시 재시도 큐로 이동
this.handleProcessingError(msg, error);
}
});
} catch (error) {
console.error('Consumption error:', error);
throw error;
}
}
}
|
참고 및 출처#