Messaging Queues#
메시징 큐는 현대 시스템 아키텍처에서 핵심적인 통합 패턴으로, 특히 분산 시스템과 마이크로서비스 환경에서 중요한 역할을 한다. 이는 시스템 간의 느슨한 결합, 확장성, 복원력, 비동기 통신 등 다양한 이점을 제공하기 때문이다.
특히 API 통합 측면에서 메시징 큐는 다음과 같은 중요한 역할을 한다:
- 트래픽 관리: API 게이트웨이와 메시징 큐를 결합하여 급증하는 트래픽을 효과적으로 관리할 수 있다.
- 서비스 격리: 서비스 간 직접적인 의존성을 제거하여 한 서비스의 장애가 전체 시스템에 미치는 영향을 최소화한다.
- 비동기 통신: 즉각적인 응답이 필요 없는 작업을 비동기적으로 처리하여 API 응답 시간을 개선한다.
- 데이터 통합: 여러 시스템 간의 데이터 흐름을 조정하고 일관성을 유지한다.
- 이벤트 기반 아키텍처: 이벤트 생성, 전파, 소비를 지원하여 이벤트 기반 시스템의 기반이 된다.
메시징 큐를 효과적으로 활용하기 위해서는 메시지 설계, 큐 구조, 오류 처리, 확장성, 모니터링 등 다양한 측면에서 신중한 계획과 구현이 필요하다. 또한 비즈니스 요구사항과 기술적 제약을 고려하여 적절한 메시징 기술을 선택하는 것이 중요하다.
최신 클라우드 네이티브 환경에서는 Kafka, RabbitMQ, Amazon SQS/SNS, Google Pub/Sub 등 다양한 메시징 솔루션을 활용할 수 있으며, 각각의 솔루션은 특정 사용 사례와 요구사항에 더 적합하다. 각 조직의 상황에 맞는 메시징 큐 전략을 수립하고 구현하는 것이 성공적인 API 통합의 핵심 요소이다.
결론적으로, 메시징 큐는 단순한 통신 메커니즘 이상의 의미를 가진다. 이는 확장 가능하고, 유연하며, 복원력 있는 시스템 아키텍처를 구축하기 위한 전략적 도구이다. API 통합 패턴으로서 메시징 큐를 이해하고 활용함으로써, 조직은 더 강력하고 미래 지향적인 시스템을 구축할 수 있다.
메시징 큐의 기본 개념#
메시징 큐는 비동기 통신을 가능하게 하는 중간 저장소로, 메시지 생산자(Producer)와 소비자(Consumer) 사이에서 데이터를 버퍼링하는 역할을 한다. 이는 마치 우체통과 같이 작동한다 - 발신자가 메시지를 보내면 우체통(큐)에 저장되고, 수신자는 자신의 속도와 능력에 맞춰 메시지를 가져가 처리한다.
메시징 큐의 핵심 기능은 다음과 같다:
- 비동기 통신: 생산자와 소비자가 동시에 활성화될 필요가 없음
- 버퍼링: 일시적인 부하 증가나 소비자 장애 시 메시지 보존
- 분리(Decoupling): 시스템 구성 요소 간의 직접적인 의존성 제거
- 부하 분산: 여러 소비자 간에 작업 분배 가능
메시징 큐의 주요 구성 요소#
메시지(Message)#
메시징 시스템의 기본 단위로, 일반적으로 다음 요소로 구성된다:
- 헤더(Header): 메타데이터(메시지 ID, 타임스탬프, 우선순위 등)
- 본문(Body): 실제 전송되는 데이터(JSON, XML, 바이너리 등)
- 속성(Properties): 라우팅 키, 만료 시간 등의 추가 정보
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // 메시지 예시
{
"header": {
"messageId": "msg-123456",
"timestamp": "2025-03-23T10:15:30Z",
"priority": 1
},
"body": {
"orderId": "ORD-9876",
"customerId": "CUST-5432",
"items": [
{"productId": "PROD-001", "quantity": 2},
{"productId": "PROD-015", "quantity": 1}
],
"totalAmount": 79.98
},
"properties": {
"routingKey": "orders.new",
"contentType": "application/json",
"expirationTime": 86400
}
}
|
큐(Queue)#
메시지가 저장되는 데이터 구조로, 일반적으로 FIFO(First-In-First-Out) 방식을 따른다.
큐의 특성은 다음과 같다:
- 내구성(Durability): 시스템 재시작 후에도 메시지 보존 여부
- 배타성(Exclusivity): 단일 소비자만 접근 가능 여부
- 자동 삭제(Auto-delete): 모든 소비자 연결 해제 시 자동 삭제 여부
익스체인지(Exchange)#
일부 메시징 시스템(RabbitMQ 등)에서 사용되는 개념으로, 생산자로부터 메시지를 받아 라우팅 규칙에 따라 적절한 큐로 전달한다.
주요 타입은 다음과 같다:
- 다이렉트(Direct): 정확한 라우팅 키 매칭
- 토픽(Topic): 패턴 기반 라우팅
- 팬아웃(Fanout): 모든 바인딩된 큐에 브로드캐스트
- 헤더(Headers): 헤더 속성 기반 라우팅
생산자(Producer)#
메시지를 생성하고 메시징 시스템에 전송하는 애플리케이션이다.
생산자의 주요 책임은 다음과 같다:
- 메시지 형식 정의 및 생성
- 적절한 라우팅 정보 제공
- 필요시 전송 확인 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| # RabbitMQ를 사용한 메시지 생산자 예시 (Python)
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 큐 선언
channel.queue_declare(queue='order_processing', durable=True)
# 메시지 생성
message = {
'orderId': 'ORD-9876',
'customerId': 'CUST-5432',
'items': [
{'productId': 'PROD-001', 'quantity': 2},
{'productId': 'PROD-015', 'quantity': 1}
],
'totalAmount': 79.98
}
# 메시지 발행
channel.basic_publish(
exchange='',
routing_key='order_processing',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 메시지 지속성 보장
content_type='application/json'
)
)
print(f"주문 처리 메시지 발행: {message['orderId']}")
connection.close()
|
소비자(Consumer)#
큐에서 메시지를 수신하고 처리하는 애플리케이션이다.
소비자의 주요 책임은 다음과 같다:
- 메시지 수신 및 처리
- 처리 성공/실패 확인(Acknowledgment)
- 오류 처리 및 재시도 전략 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| # RabbitMQ를 사용한 메시지 소비자 예시 (Python)
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 큐 선언 (생산자와 동일한 큐)
channel.queue_declare(queue='order_processing', durable=True)
# 한 번에 하나의 메시지만 처리하도록 설정
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
# 메시지 처리
order = json.loads(body)
print(f"주문 처리 중: {order['orderId']}")
# 실제 비즈니스 로직 처리
process_order(order)
# 처리 완료 확인
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"주문 처리 완료: {order['orderId']}")
def process_order(order):
# 주문 처리 로직 구현
time.sleep(2) # 처리 시간 시뮬레이션
# 메시지 소비 시작
channel.basic_consume(queue='order_processing', on_message_callback=callback)
print('주문 처리 서비스 실행 중... Ctrl+C로 종료')
channel.start_consuming()
|
API 통합을 위한 메시징 큐 패턴#
작업 큐(Work Queue) 패턴#
시간이 많이 소요되거나 리소스 집약적인 작업을 비동기적으로 처리하기 위한 패턴이다. API 요청을 즉시 처리하는 대신 큐에 작업을 추가하고 응답을 빠르게 반환한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| // Node.js Express API 엔드포인트 예시
const express = require('express');
const amqp = require('amqplib');
const app = express();
app.use(express.json());
// RabbitMQ 연결
let channel;
async function connectQueue() {
const connection = await amqp.connect('amqp://localhost');
channel = await connection.createChannel();
await channel.assertQueue('image_processing', { durable: true });
}
connectQueue();
// 이미지 처리 API 엔드포인트
app.post('/api/images/process', async (req, res) => {
const { imageUrl, filters, userId } = req.body;
// 작업 ID 생성
const jobId = generateJobId();
// 작업을 큐에 추가
channel.sendToQueue('image_processing', Buffer.from(JSON.stringify({
jobId,
imageUrl,
filters,
userId,
timestamp: new Date().toISOString()
})), { persistent: true });
// 작업 ID와 상태 반환
res.status(202).json({
jobId,
status: 'processing',
statusUrl: `/api/jobs/${jobId}`
});
});
// 작업 상태 확인 API 엔드포인트
app.get('/api/jobs/:jobId', async (req, res) => {
const { jobId } = req.params;
// 작업 상태 조회 로직
const jobStatus = await getJobStatus(jobId);
res.json(jobStatus);
});
app.listen(3000, () => {
console.log('API 서버가 포트 3000에서 실행 중입니다.');
});
|
게시-구독(Pub-Sub) 패턴#
하나의 메시지를 여러 소비자에게 전달하는 패턴으로, 이벤트 기반 아키텍처의 기반이 된다. 이벤트가 발생하면 관심 있는 모든 서비스가 알림을 받을 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| // C# 이벤트 발행 서비스 예시
public class OrderService
{
private readonly IMessageBroker _messageBroker;
private readonly IOrderRepository _orderRepository;
public OrderService(IMessageBroker messageBroker, IOrderRepository orderRepository)
{
_messageBroker = messageBroker;
_orderRepository = orderRepository;
}
public async Task<Order> CreateOrderAsync(OrderRequest request)
{
// 주문 생성 및 저장
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = request.CustomerId,
Items = request.Items,
TotalAmount = request.Items.Sum(i => i.Price * i.Quantity),
Status = OrderStatus.Created,
CreatedAt = DateTime.UtcNow
};
await _orderRepository.SaveAsync(order);
// 주문 생성 이벤트 발행
await _messageBroker.PublishAsync("orders.created", new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
Items = order.Items.Select(i => new OrderItemEvent
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price
}).ToList(),
CreatedAt = order.CreatedAt
});
return order;
}
}
|
요청-응답(Request-Reply) 패턴#
비동기적인 요청-응답 상호작용을 구현하는 패턴으로, 응답 큐와 상관관계 ID를 사용하여 응답을 추적한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| // Java Spring Boot 요청-응답 패턴 예시
@Service
public class ProductService {
private final RabbitTemplate rabbitTemplate;
private final String requestQueue = "product.info.request";
private final String replyQueue = "product.info.reply";
public ProductService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public CompletableFuture<ProductInfo> getProductInfo(String productId) {
CompletableFuture<ProductInfo> future = new CompletableFuture<>();
String correlationId = UUID.randomUUID().toString();
// 응답을 수신할 콜백 등록
rabbitTemplate.convertAndSend(
requestQueue,
new ProductInfoRequest(productId),
message -> {
message.getMessageProperties().setReplyTo(replyQueue);
message.getMessageProperties().setCorrelationId(correlationId);
return message;
}
);
// 비동기 응답 처리
rabbitTemplate.receive(replyQueue, 30000, message -> {
if (correlationId.equals(message.getMessageProperties().getCorrelationId())) {
ProductInfo productInfo = (ProductInfo) rabbitTemplate.getMessageConverter()
.fromMessage(message);
future.complete(productInfo);
}
});
return future;
}
}
|
경쟁 소비자(Competing Consumers) 패턴#
여러 소비자가 동일한 큐에서 메시지를 처리하여 부하를 분산하는 패턴이다. 메시지 처리량을 높이고 시스템의 확장성을 개선한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| # Python FastAPI 경쟁 소비자 패턴 예시 (Celery 사용)
from fastapi import FastAPI, BackgroundTasks
from celery import Celery
from typing import List
import time
app = FastAPI()
# Celery 설정
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# 데이터 처리 작업 정의
@celery_app.task
def process_data_chunk(chunk_id: int, data: List[dict]):
print(f"Chunk {chunk_id} 처리 시작: {len(data)} 항목")
# 실제 데이터 처리 로직
time.sleep(len(data) * 0.1) # 데이터 처리 시뮬레이션
results = [item['value'] * 2 for item in data]
print(f"Chunk {chunk_id} 처리 완료")
return results
@app.post("/api/data/process")
async def process_large_dataset(data: List[dict], background_tasks: BackgroundTasks):
# 대용량 데이터를 청크로 분할
chunk_size = 100
data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
# 각 청크를 별도 작업으로 큐에 추가
task_ids = []
for i, chunk in enumerate(data_chunks):
task = process_data_chunk.delay(i, chunk)
task_ids.append(task.id)
return {
"message": f"{len(data)} 항목이 {len(data_chunks)} 청크로 처리 중입니다.",
"task_ids": task_ids,
"status_url": "/api/tasks/status"
}
@app.get("/api/tasks/status")
async def get_tasks_status(task_ids: List[str]):
results = {}
for task_id in task_ids:
task = celery_app.AsyncResult(task_id)
results[task_id] = {
"status": task.status,
"result": task.result if task.ready() else None
}
return results
|
메시징 큐의 고급 기능과 패턴#
메시지 우선순위(Priority)#
중요도에 따라 메시지 처리 순서를 조정하는 기능이다. 긴급한 메시지가 먼저 처리되도록 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| // Java에서 우선순위 큐 사용 예시
@Service
public class NotificationService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public NotificationService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendNotification(Notification notification) {
// 알림 유형에 따라 우선순위 설정
int priority = switch (notification.getType()) {
case ALERT -> 10; // 가장 높은 우선순위
case WARNING -> 5;
case INFO -> 1; // 가장 낮은 우선순위
default -> 1;
};
rabbitTemplate.convertAndSend(
"notifications.exchange",
"notifications.queue",
notification,
message -> {
message.getMessageProperties().setPriority(priority);
return message;
}
);
}
}
|
재시도 큐(Retry Queue)와 데드 레터 큐(Dead Letter Queue)#
처리 실패한 메시지를 재시도하거나 분석을 위해 별도의 큐로 이동시키는 패턴이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| // Node.js에서 재시도 큐와 데드 레터 큐 구현 예시
const amqp = require('amqplib');
async function setupQueues() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// 데드 레터 익스체인지 및 큐 설정
await channel.assertExchange('dlx.payment', 'direct');
await channel.assertQueue('payment.failed', {
durable: true,
arguments: {
'x-message-ttl': 86400000 // 24시간 보관
}
});
await channel.bindQueue('payment.failed', 'dlx.payment', 'payment');
// 재시도 큐 설정
await channel.assertQueue('payment.retry', {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'payment.process',
'x-message-ttl': 60000 // 1분 후 재시도
}
});
// 메인 처리 큐 설정
await channel.assertQueue('payment.process', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx.payment',
'x-dead-letter-routing-key': 'payment'
}
});
return { connection, channel };
}
async function setupConsumer(channel) {
channel.prefetch(1);
channel.consume('payment.process', async (msg) => {
try {
const payment = JSON.parse(msg.content.toString());
console.log(`결제 처리 중: ${payment.id}`);
// 결제 처리 로직
const processed = await processPayment(payment);
if (processed) {
// 성공적으로 처리됨
channel.ack(msg);
console.log(`결제 성공: ${payment.id}`);
} else {
// 일시적 오류, 재시도 필요
const retryCount = (msg.properties.headers['x-retry-count'] || 0) + 1;
if (retryCount <= 3) {
// 재시도 큐로 전송
channel.publish('', 'payment.retry', msg.content, {
persistent: true,
headers: { 'x-retry-count': retryCount }
});
channel.ack(msg);
console.log(`결제 재시도 예약 (${retryCount}/3): ${payment.id}`);
} else {
// 최대 재시도 횟수 초과, 데드 레터 큐로 이동
channel.nack(msg, false, false);
console.log(`결제 실패, 데드 레터로 이동: ${payment.id}`);
}
}
} catch (error) {
// 처리 중 오류 발생
console.error(`결제 처리 오류: ${error.message}`);
channel.nack(msg, false, false);
}
});
}
function processPayment(payment) {
// 실제 결제 처리 로직
return Math.random() > 0.3; // 70% 성공률 시뮬레이션
}
async function main() {
const { connection, channel } = await setupQueues();
await setupConsumer(channel);
console.log('결제 처리 서비스 실행 중...');
}
main().catch(console.error);
|
3. 메시지 유효기간(TTL, Time-To-Live)#
메시지가 큐에 머무를 수 있는 최대 시간을 설정하는 기능으로, 오래된 메시지가 시스템 리소스를 차지하지 않도록 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| // C#에서 메시지 TTL 설정 예시 (RabbitMQ.Client 사용)
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
public class NotificationSender
{
private readonly IModel _channel;
public NotificationSender(IModel channel)
{
_channel = channel;
// TTL이 적용된 큐 선언
var queueArgs = new Dictionary<string, object>
{
{ "x-message-ttl", 300000 } // 5분 TTL
};
_channel.QueueDeclare(
queue: "notifications.transient",
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArgs
);
}
public void SendTransientNotification(string userId, string message)
{
var notification = new
{
UserId = userId,
Message = message,
Timestamp = DateTime.UtcNow
};
var body = Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(notification));
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(
exchange: "",
routingKey: "notifications.transient",
basicProperties: properties,
body: body
);
}
public void SendPriorityNotification(string userId, string message, int expirationSeconds)
{
var notification = new
{
UserId = userId,
Message = message,
Timestamp = DateTime.UtcNow
};
var body = Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(notification));
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.Expiration = (expirationSeconds * 1000).ToString(); // 개별 메시지 TTL 설정
_channel.BasicPublish(
exchange: "",
routingKey: "notifications.priority",
basicProperties: properties,
body: body
);
}
}
|
메시지 지연 전송(Delayed Delivery)#
메시지를 즉시 처리하지 않고 일정 시간 후에 처리하도록 예약하는 기능이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| // Java에서 지연 메시지 구현 예시 (Spring AMQP 사용)
@Configuration
public class RabbitMQConfig {
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue reminderQueue() {
return new Queue("reminders.queue", true);
}
@Bean
public Binding reminderBinding(Queue reminderQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(reminderQueue)
.to(delayExchange)
.with("reminders.routing")
.noargs();
}
}
@Service
public class ReminderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public ReminderService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void scheduleReminder(String userId, String message, long delayInSeconds) {
Reminder reminder = new Reminder(
UUID.randomUUID().toString(),
userId,
message,
LocalDateTime.now().plusSeconds(delayInSeconds)
);
rabbitTemplate.convertAndSend(
"delay.exchange",
"reminders.routing",
reminder,
message -> {
message.getMessageProperties().setHeader("x-delay", delayInSeconds * 1000);
return message;
}
);
System.out.println("알림 예약됨: " + delayInSeconds + "초 후 " + userId + "님에게 전송");
}
}
|
메시징 큐 기술 비교#
RabbitMQ#
- 특징: AMQP 프로토콜 지원, 다양한 메시징 패턴, 고급 라우팅
- 장점: 유연한 라우팅, 플러그인 생태계, 관리 UI
- 적합한 경우: 복잡한 라우팅 요구사항, 다양한 메시징 패턴 필요 시
Apache Kafka#
- 특징: 분산 스트리밍 플랫폼, 높은 처리량, 로그 보존
- 장점: 확장성, 내구성, 실시간 스트림 처리
- 적합한 경우: 대용량 이벤트 스트리밍, 실시간 분석, 이벤트 소싱
Amazon SQS/SNS#
- 특징: 관리형 메시징 서비스, 서버리스 운영
- 장점: 운영 오버헤드 없음, 자동 확장
- 적합한 경우: AWS 인프라 사용, 최소한의 운영 복잡성 추구
Redis Pub/Sub#
- 특징: 인메모리 데이터 스토어 기반 메시징
- 장점: 빠른 처리 속도, 간단한 설정
- 적합한 경우: 짧은 지연 시간 요구사항, 간단한 메시징 요구사항
API 시스템에서 메시징 큐 사용 사례#
비동기 작업 처리#
오래 걸리는 작업을 비동기적으로 처리하여 API의 응답 시간을 개선할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| // Kotlin Spring Boot 비동기 작업 처리 예시
@RestController
@RequestMapping("/api/reports")
class ReportController(
private val reportPublisher: ReportPublisher,
private val reportRepository: ReportRepository
) {
@PostMapping
fun generateReport(@RequestBody request: ReportRequest): ResponseEntity<ReportResponse> {
// 보고서 작업 생성
val reportJob = ReportJob(
id = UUID.randomUUID().toString(),
userId = request.userId,
parameters = request.parameters,
status = ReportStatus.PENDING,
createdAt = LocalDateTime.now()
)
// 작업 저장
reportRepository.save(reportJob)
// 작업을 큐에 발행
reportPublisher.publishReportJob(reportJob)
// 즉시 응답 반환
return ResponseEntity
.accepted()
.body(ReportResponse(
jobId = reportJob.id,
status = reportJob.status,
statusUrl = "/api/reports/${reportJob.id}"
))
}
@GetMapping("/{jobId}")
fun getReportStatus(@PathVariable jobId: String): ResponseEntity<ReportJobStatus> {
val reportJob = reportRepository.findById(jobId)
?: return ResponseEntity.notFound().build()
return ResponseEntity.ok(
ReportJobStatus(
jobId = reportJob.id,
status = reportJob.status,
progress = reportJob.progress,
result = reportJob.result,
createdAt = reportJob.createdAt,
completedAt = reportJob.completedAt
)
)
}
}
|
시스템 간 통합#
여러 시스템 간의 느슨한 결합을 통해 확장성과 유연성을 제공한다. 각 시스템은 독립적으로 진화하면서도 메시지를 통해 효과적으로 통신할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| // 주문 시스템에서 결제 시스템으로 메시지 전송 예시 (Java)
@Service
public class OrderCompletionService {
private final JmsTemplate jmsTemplate;
private final OrderRepository orderRepository;
@Autowired
public OrderCompletionService(JmsTemplate jmsTemplate, OrderRepository orderRepository) {
this.jmsTemplate = jmsTemplate;
this.orderRepository = orderRepository;
}
@Transactional
public void completeOrder(String orderId, PaymentDetails paymentDetails) {
// 주문 상태 업데이트
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.setStatus(OrderStatus.PAID);
order.setPaymentDetails(paymentDetails);
order.setUpdatedAt(LocalDateTime.now());
orderRepository.save(order);
// 결제 완료 메시지를 다른 시스템에 전송
OrderPaidEvent event = new OrderPaidEvent(
orderId,
order.getCustomerId(),
paymentDetails.getAmount(),
paymentDetails.getPaymentMethod(),
order.getItems(),
order.getShippingAddress(),
LocalDateTime.now()
);
// 배송 시스템에 알림
jmsTemplate.convertAndSend("shipping.orders.paid", event);
// 재고 시스템에 알림
jmsTemplate.convertAndSend("inventory.orders.paid", event);
// 고객 알림 시스템에 알림
jmsTemplate.convertAndSend("notifications.orders.paid", event);
System.out.println("주문 " + orderId + " 완료 메시지 전송됨");
}
}
|
부하 분산 및 처리량 향상#
트래픽이 많은 작업을 여러 워커 인스턴스로 분산하여 시스템의 전체 처리량을 향상시킬 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
| # Python FastAPI에서 이미지 처리 부하 분산 예시
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import aio_pika
import asyncio
import uuid
import json
import os
app = FastAPI()
# RabbitMQ 연결 설정
async def get_rabbitmq_connection():
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
# 이미지 업로드 및 처리 요청 API
@app.post("/api/images/process")
async def process_image(file: UploadFile = File(...), background_tasks: BackgroundTasks):
# 파일 저장
file_id = str(uuid.uuid4())
file_path = f"uploads/{file_id}_{file.filename}"
os.makedirs("uploads", exist_ok=True)
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# 이미지 처리 작업 큐에 전송
background_tasks.add_task(send_to_processing_queue, file_id, file_path)
return JSONResponse(
status_code=202,
content={
"message": "이미지 처리가 시작되었습니다.",
"file_id": file_id,
"status_url": f"/api/images/status/{file_id}"
}
)
# 이미지 처리 작업을 큐에 전송
async def send_to_processing_queue(file_id: str, file_path: str):
connection = await get_rabbitmq_connection()
async with connection:
channel = await connection.channel()
# 이미지 처리 큐 선언
queue = await channel.declare_queue(
"image_processing",
durable=True
)
# 메시지 생성 및 전송
message_body = json.dumps({
"file_id": file_id,
"file_path": file_path,
"timestamp": str(datetime.datetime.now())
}).encode()
await channel.default_exchange.publish(
aio_pika.Message(
body=message_body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key="image_processing"
)
print(f"이미지 처리 요청이 큐에 전송됨: {file_id}")
# 이미지 처리 상태 확인 API
@app.get("/api/images/status/{file_id}")
async def get_image_status(file_id: str):
# 실제 구현에서는 DB에서 처리 상태 조회
# 여기서는 간단한 예시만 표시
return {
"file_id": file_id,
"status": "processing", # 실제로는 DB에서 현재 상태 조회
"message": "이미지가 처리 중입니다."
}
|
장애 격리 및 복원력#
일시적인 장애나 부하 증가에도 시스템이 계속 작동할 수 있도록 메시지를 버퍼링하고 재시도 메커니즘을 구현한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
| // TypeScript에서 장애 복원력 패턴 구현 예시
import { Channel, Connection, connect } from 'amqplib';
class ResilientEmailService {
private connection: Connection | null = null;
private channel: Channel | null = null;
private readonly maxRetries = 3;
private isConnecting = false;
private pendingMessages: any[] = [];
// 연결 설정
async setupConnection(): Promise<void> {
if (this.isConnecting) return;
this.isConnecting = true;
try {
// RabbitMQ 연결
this.connection = await connect('amqp://localhost');
// 연결 오류 처리
this.connection.on('error', (err) => {
console.error('RabbitMQ 연결 오류:', err);
this.resetConnection();
});
this.connection.on('close', () => {
console.log('RabbitMQ 연결이 닫혔습니다. 재연결 시도 중...');
this.resetConnection();
setTimeout(() => this.setupConnection(), 5000);
});
// 채널 생성
this.channel = await this.connection.createChannel();
// 이메일 큐 설정
await this.channel.assertQueue('emails', { durable: true });
// 재시도 큐 설정
await this.channel.assertQueue('emails.retry', {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'emails',
'x-message-ttl': 60000 // 1분 후 재시도
}
});
// 데드 레터 큐 설정
await this.channel.assertQueue('emails.failed', { durable: true });
console.log('RabbitMQ 연결 및 채널 설정 완료');
// 대기 중인 메시지 처리
this.processPendingMessages();
} catch (error) {
console.error('RabbitMQ 연결 설정 실패:', error);
this.resetConnection();
setTimeout(() => this.setupConnection(), 5000);
} finally {
this.isConnecting = false;
}
}
// 연결 리셋
private resetConnection(): void {
this.channel = null;
this.connection = null;
}
// 대기 중인 메시지 처리
private async processPendingMessages(): Promise<void> {
if (!this.channel || this.pendingMessages.length === 0) return;
const messages = [...this.pendingMessages];
this.pendingMessages = [];
for (const msg of messages) {
await this.sendEmailToQueue(msg.email, msg.retryCount);
}
}
// 이메일 메시지를 큐에 전송
async sendEmail(to: string, subject: string, body: string): Promise<void> {
const email = { to, subject, body, timestamp: new Date().toISOString() };
try {
if (!this.channel) {
await this.setupConnection();
if (!this.channel) {
// 여전히 연결이 없는 경우 대기 중인 메시지에 추가
this.pendingMessages.push({ email, retryCount: 0 });
return;
}
}
await this.sendEmailToQueue(email, 0);
} catch (error) {
console.error('이메일 전송 오류:', error);
// 오류 발생 시 대기 중인 메시지에 추가
this.pendingMessages.push({ email, retryCount: 0 });
// 연결 재설정 시도
this.resetConnection();
setTimeout(() => this.setupConnection(), 5000);
}
}
// 이메일을 큐에 실제로 전송
private async sendEmailToQueue(email: any, retryCount: number): Promise<void> {
if (!this.channel) throw new Error('RabbitMQ 채널이 설정되지 않았습니다.');
const message = {
...email,
retryCount,
sentAt: new Date().toISOString()
};
// 메시지를 이메일 큐에 발행
this.channel.sendToQueue(
'emails',
Buffer.from(JSON.stringify(message)),
{
persistent: true,
headers: { 'x-retry-count': retryCount }
}
);
console.log(`이메일이 큐에 전송됨: ${email.to}, 재시도: ${retryCount}`);
}
// 이메일 소비자 설정
async setupConsumer(): Promise<void> {
if (!this.channel) {
await this.setupConnection();
if (!this.channel) throw new Error('RabbitMQ 채널이 설정되지 않았습니다.');
}
// 이메일 처리를 위한 소비자 설정
this.channel.prefetch(1);
this.channel.consume('emails', async (msg) => {
if (!msg) return;
try {
const email = JSON.parse(msg.content.toString());
console.log(`이메일 처리 중: ${email.to}`);
// 실제 이메일 전송 로직
const success = await this.deliverEmail(email);
if (success) {
// 성공적으로 처리됨
this.channel?.ack(msg);
console.log(`이메일 전송 성공: ${email.to}`);
} else {
// 전송 실패, 재시도 필요
const retryCount = (msg.properties.headers['x-retry-count'] || 0) + 1;
if (retryCount <= this.maxRetries) {
// 재시도 큐로 전송
this.channel?.publish(
'',
'emails.retry',
msg.content,
{
persistent: true,
headers: { 'x-retry-count': retryCount }
}
);
this.channel?.ack(msg);
console.log(`이메일 재시도 예약 (${retryCount}/${this.maxRetries}): ${email.to}`);
} else {
// 최대 재시도 횟수 초과, 실패 큐로 이동
this.channel?.publish('', 'emails.failed', msg.content, { persistent: true });
this.channel?.ack(msg);
console.log(`이메일 전송 실패, 데드 레터로 이동: ${email.to}`);
}
}
} catch (error) {
console.error('이메일 처리 중 오류:', error);
// 처리 오류, 데드 레터 큐로 이동
this.channel?.nack(msg, false, false);
}
});
console.log('이메일 소비자가 시작되었습니다.');
}
// 실제 이메일 전송 로직 (외부 서비스 호출)
private async deliverEmail(email: any): Promise<boolean> {
try {
// 실제로는 SMTP 서비스 등을 호출
console.log(`이메일 전송 시도: ${email.to}, 제목: ${email.subject}`);
// 성공/실패 시뮬레이션 (80% 성공률)
const isSuccessful = Math.random() > 0.2;
if (isSuccessful) {
return true;
} else {
console.log(`이메일 전송 일시적 실패: ${email.to}`);
return false;
}
} catch (error) {
console.error(`이메일 전송 중 오류: ${error}`);
return false;
}
}
}
// 사용 예시
async function main() {
const emailService = new ResilientEmailService();
await emailService.setupConnection();
await emailService.setupConsumer();
// 이메일 전송 테스트
await emailService.sendEmail(
'user@example.com',
'중요 알림',
'귀하의 계정에 중요한 변경사항이 있습니다.'
);
}
main().catch(console.error);
|
메시징 큐 구현 시 고려사항#
메시지 보장성(Delivery Guarantees)#
메시지 전송의 보장 수준에 따라 시스템 설계가 달라진다.
- At-most-once 전달: 메시지가 최대 한 번 전달되거나 손실될 수 있음 (최소한의 오버헤드)
- At-least-once 전달: 메시지가 최소 한 번 전달되지만 중복 가능성 있음 (멱등성 필요)
- Exactly-once 전달: 메시지가 정확히 한 번 전달됨 (가장 높은 오버헤드)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| // At-least-once 전송을 위한 Java Spring 설정 예시
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 게시자 확인 활성화
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
// 게시자 확인 콜백
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("메시지가 브로커에 성공적으로 전달됨");
} else {
System.out.println("메시지 전달 실패: " + cause);
// 여기서 메시지 재시도 로직 구현
}
});
// 반환된 메시지 처리
template.setReturnsCallback(returned -> {
System.out.println("메시지 반환됨: " + returned.getMessage() +
", replyCode: " + returned.getReplyCode() +
", replyText: " + returned.getReplyText() +
", exchange: " + returned.getExchange() +
", routingKey: " + returned.getRoutingKey());
// 라우팅 실패 처리 로직
});
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 자동 확인 비활성화 (수동 확인 사용)
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 동시성 설정
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
// 소비자 당 메시지 수
factory.setPrefetchCount(1);
return factory;
}
}
|
메시지 순서(Ordering)#
특정 시나리오에서는 메시지 처리 순서가 중요할 수 있다.
순서 보장을 위한 전략은 다음과 같다.
- 동일한 파티션 키(Partition Key)를 사용하여 관련 메시지가 같은 소비자에게 전달
- 순차 번호(Sequence Number)를 메시지에 포함하여 소비자가 순서를 확인
- 단일 소비자 모델 사용 (확장성 제한)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| // C#에서 메시지 순서 보장 구현 예시
public class OrderedMessageProcessor
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _entityLocks = new();
private readonly ConcurrentDictionary<string, long> _lastProcessedSequence = new();
public async Task ProcessMessageAsync(Message message)
{
// 파티션 키 추출 (예: 고객 ID)
string partitionKey = message.Headers["partition-key"];
long sequenceNumber = long.Parse(message.Headers["sequence-number"]);
// 엔티티별 잠금 획득
SemaphoreSlim entityLock = _entityLocks.GetOrAdd(partitionKey, _ => new SemaphoreSlim(1, 1));
await entityLock.WaitAsync();
try
{
// 마지막으로 처리된 시퀀스 번호 확인
_lastProcessedSequence.TryGetValue(partitionKey, out long lastSequence);
// 순서가 맞지 않는 메시지는 지연 처리 또는 거부
if (sequenceNumber < lastSequence + 1)
{
// 이미 처리된 메시지 - 무시
Console.WriteLine($"이미 처리된 메시지 무시: {partitionKey}, 시퀀스: {sequenceNumber}");
return;
}
else if (sequenceNumber > lastSequence + 1)
{
// 순서가 맞지 않는 메시지 - 다시 큐에 넣거나 지연 처리
Console.WriteLine($"순서가 맞지 않는 메시지: {partitionKey}, 예상: {lastSequence + 1}, 실제: {sequenceNumber}");
throw new OutOfOrderMessageException(partitionKey, lastSequence + 1, sequenceNumber);
}
// 정상적인 순서의 메시지 처리
await ProcessMessageInternalAsync(message);
// 처리된 시퀀스 번호 업데이트
_lastProcessedSequence[partitionKey] = sequenceNumber;
Console.WriteLine($"메시지 처리 완료: {partitionKey}, 시퀀스: {sequenceNumber}");
}
finally
{
entityLock.Release();
}
}
private async Task ProcessMessageInternalAsync(Message message)
{
// 실제 메시지 처리 로직
string messageBody = Encoding.UTF8.GetString(message.Body.ToArray());
Console.WriteLine($"메시지 처리 중: {messageBody}");
await Task.Delay(100); // 처리 시간 시뮬레이션
}
}
// 순서가 맞지 않는 메시지 예외
public class OutOfOrderMessageException : Exception
{
public string PartitionKey { get; }
public long ExpectedSequence { get; }
public long ActualSequence { get; }
public OutOfOrderMessageException(string partitionKey, long expectedSequence, long actualSequence)
: base($"순서가 맞지 않는 메시지: {partitionKey}, 예상: {expectedSequence}, 실제: {actualSequence}")
{
PartitionKey = partitionKey;
ExpectedSequence = expectedSequence;
ActualSequence = actualSequence;
}
}
|
최적화#
메시징 시스템의 성능을 최적화하기 위한 주요 고려사항은 아래와 같다.
- 배치 처리(Batching): 여러 메시지를 묶어서 전송하여 네트워크 오버헤드 감소
- 미리 가져오기(Prefetching): 소비자가 메시지를 미리 가져와 처리 지연 시간 감소
- 압축(Compression): 대용량 메시지 압축으로 네트워크 대역폭 절약
- 동시성(Concurrency): 적절한 수의 소비자 쓰레드로 처리량 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
| # Python에서 배치 처리 구현 예시 (Kafka 사용)
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic
import json
import time
from typing import List, Dict, Any
class BatchMessageProducer:
def __init__(self, bootstrap_servers: str, topic: str, batch_size: int = 100, flush_interval: int = 5):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'batch.size': 16384, # 배치 크기(바이트)
'linger.ms': 50, # 배치 지연 시간(밀리초)
'compression.type': 'snappy' # 메시지 압축
})
self.topic = topic
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch_buffer: List[Dict[str, Any]] = []
self.last_flush_time = time.time()
# 토픽 생성 확인
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
topics = admin_client.list_topics().topics
if self.topic not in topics:
topic_list = [NewTopic(
self.topic,
num_partitions=6,
replication_factor=1
)]
admin_client.create_topics(topic_list)
print(f"토픽 생성됨: {self.topic}")
def delivery_report(self, err, msg):
if err is not None:
print(f"메시지 전송 실패: {err}")
else:
print(f"메시지 전송 성공: {msg.topic()} [{msg.partition()}] @ 오프셋 {msg.offset()}")
def add_message(self, message: Dict[str, Any]):
"""배치 버퍼에 메시지 추가"""
self.batch_buffer.append(message)
# 배치 크기에 도달하거나 플러시 간격이 경과하면 전송
current_time = time.time()
if (len(self.batch_buffer) >= self.batch_size or
current_time - self.last_flush_time >= self.flush_interval):
self.flush()
def flush(self):
"""배치 버퍼의 모든 메시지 전송"""
if not self.batch_buffer:
return
print(f"배치 플러시: {len(self.batch_buffer)} 메시지")
for message in self.batch_buffer:
# 메시지 직렬화
message_value = json.dumps(message).encode('utf-8')
# 파티션 키 설정 (메시지 순서가 중요한 경우)
partition_key = str(message.get('entity_id', '')).encode('utf-8')
# 메시지 생산
self.producer.produce(
topic=self.topic,
key=partition_key,
value=message_value,
callback=self.delivery_report
)
# 생산자 플러시
self.producer.flush()
# 버퍼 비우기 및 타임스탬프 업데이트
self.batch_buffer.clear()
self.last_flush_time = time.time()
def close(self):
"""남은 메시지 플러시 및 생산자 종료"""
self.flush()
self.producer.flush()
class ParallelMessageConsumer:
def __init__(self, bootstrap_servers: str, topic: str, group_id: str, num_workers: int = 4):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.group_id = group_id
self.num_workers = num_workers
self.running = False
self.workers = []
def create_consumer(self):
"""Kafka 소비자 생성"""
return Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000, # 5분
'session.timeout.ms': 30000, # 30초
'fetch.min.bytes': 1024, # 최소 가져오기 크기
'fetch.max.wait.ms': 500 # 최대 대기 시간
})
def process_message(self, message):
"""메시지 처리 로직 (오버라이드 필요)"""
try:
# 메시지 역직렬화
message_value = json.loads(message.value().decode('utf-8'))
print(f"메시지 처리: {message_value}")
# 실제 처리 로직 구현
time.sleep(0.1) # 처리 시간 시뮬레이션
return True
except Exception as e:
print(f"메시지 처리 오류: {e}")
return False
def worker_loop(self, worker_id):
"""작업자 스레드 메인 루프"""
consumer = self.create_consumer()
consumer.subscribe([self.topic])
print(f"작업자 {worker_id} 시작됨, 토픽: {self.topic}")
try:
while self.running:
# 메시지 폴링
messages = consumer.consume(num_messages=10, timeout=1.0)
if not messages:
continue
# 배치 처리
processed_offsets = {}
for message in messages:
if message.error():
print(f"소비자 오류: {message.error()}")
continue
# 메시지 처리
success = self.process_message(message)
if success:
# 오프셋 추적
partition = message.partition()
if partition not in processed_offsets:
processed_offsets[partition] = []
processed_offsets[partition].append(message.offset())
# 성공한 오프셋 커밋
for partition, offsets in processed_offsets.items():
if offsets:
consumer.commit(offsets=[(self.topic, partition, max(offsets) + 1)])
finally:
consumer.close()
print(f"작업자 {worker_id} 종료됨")
def start(self):
"""병렬 소비자 시작"""
self.running = True
# 작업자 스레드 시작
import threading
for i in range(self.num_workers):
worker = threading.Thread(target=self.worker_loop, args=(i,))
worker.daemon = True
worker.start()
self.workers.append(worker)
print(f"{self.num_workers}개의 병렬 작업자가 시작되었습니다.")
def stop(self):
"""소비자 중지"""
self.running = False
# 작업자 스레드가 종료될 때까지 대기
for worker in self.workers:
worker.join(timeout=5.0)
print("모든 소비자 작업자가 종료되었습니다.")
|
메시징 큐 모니터링 및 운영#
효과적인 메시징 시스템 운영을 위해서는 적절한 모니터링과 관리가 필수적이다.
다음은 주요 모니터링 포인트와 관리 전략이다.
주요 모니터링 지표#
메시징 시스템의 건강 상태와 성능을 모니터링하기 위한 핵심 지표들이다.
- 큐 깊이(Queue Depth): 큐에 적재된 메시지 수를 추적하여 처리 지연 감지
- 메시지 처리율(Processing Rate): 초당 처리되는 메시지 수를 측정하여 성능 평가
- 메시지 지연 시간(Latency): 메시지가 큐에 들어온 후 처리되기까지의 시간 측정
- 오류율(Error Rate): 처리 실패 및 재시도 횟수 모니터링
- 소비자 상태(Consumer Health): 활성 소비자 수와 소비자 지연 추적
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
| # Prometheus와 Grafana를 사용한 RabbitMQ 모니터링 예시 (Python)
import time
import threading
from prometheus_client import start_http_server, Counter, Gauge, Histogram
import pika
# Prometheus 메트릭 정의
MESSAGE_COUNT = Counter('rabbitmq_messages_total', 'Total number of messages processed', ['queue', 'status'])
QUEUE_DEPTH = Gauge('rabbitmq_queue_depth', 'Number of messages in queue', ['queue'])
PROCESSING_TIME = Histogram('rabbitmq_processing_seconds', 'Time spent processing messages', ['queue'])
ACTIVE_CONSUMERS = Gauge('rabbitmq_active_consumers', 'Number of active consumers', ['queue'])
class MonitoredConsumer:
def __init__(self, connection_params, queue_name):
self.connection_params = connection_params
self.queue_name = queue_name
self.connection = None
self.channel = None
self.thread = None
self.running = False
# 소비자 활성화 상태 표시
ACTIVE_CONSUMERS.labels(queue=queue_name).set(0)
def connect(self):
"""RabbitMQ에 연결"""
try:
self.connection = pika.BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.channel.basic_qos(prefetch_count=1)
return True
except Exception as e:
print(f"연결 오류: {e}")
return False
def start_consuming(self):
"""메시지 소비 시작"""
if not self.connect():
print("RabbitMQ 연결 실패. 30초 후 재시도합니다.")
time.sleep(30)
return self.start_consuming()
self.running = True
ACTIVE_CONSUMERS.labels(queue=self.queue_name).set(1)
def callback(ch, method, properties, body):
# 처리 시작 시간 기록
start_time = time.time()
try:
# 메시지 처리 로직
print(f"메시지 수신: {body.decode()}")
# 처리 시간 시뮬레이션
time.sleep(0.5)
# 성공적으로 처리됨
ch.basic_ack(delivery_tag=method.delivery_tag)
MESSAGE_COUNT.labels(queue=self.queue_name, status="success").inc()
# 처리 시간 기록
PROCESSING_TIME.labels(queue=self.queue_name).observe(time.time() - start_time)
except Exception as e:
# 처리 실패
print(f"처리 오류: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
MESSAGE_COUNT.labels(queue=self.queue_name, status="error").inc()
# 큐 깊이 모니터링 스레드
def monitor_queue_depth():
while self.running:
try:
# 큐 상태 확인
queue = self.channel.queue_declare(queue=self.queue_name, passive=True)
depth = queue.method.message_count
QUEUE_DEPTH.labels(queue=self.queue_name).set(depth)
except Exception as e:
print(f"큐 모니터링 오류: {e}")
QUEUE_DEPTH.labels(queue=self.queue_name).set(0)
time.sleep(5) # 5초마다 업데이트
# 모니터링 스레드 시작
monitor_thread = threading.Thread(target=monitor_queue_depth)
monitor_thread.daemon = True
monitor_thread.start()
# 소비자 시작
self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback)
try:
print(f"큐 {self.queue_name}에서 메시지 소비 시작")
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop()
except Exception as e:
print(f"소비자 오류: {e}")
self.stop()
# 재연결 시도
time.sleep(10)
self.start_consuming()
def start(self):
"""별도 스레드에서 소비자 시작"""
self.thread = threading.Thread(target=self.start_consuming)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""소비자 중지"""
self.running = False
ACTIVE_CONSUMERS.labels(queue=self.queue_name).set(0)
if self.channel:
try:
self.channel.stop_consuming()
except:
pass
if self.connection and self.connection.is_open:
try:
self.connection.close()
except:
pass
print(f"큐 {self.queue_name}의 소비자가 중지되었습니다.")
# 메인 애플리케이션
def main():
# Prometheus 메트릭 서버 시작
start_http_server(8000)
print("Prometheus 메트릭 서버가 포트 8000에서 실행 중입니다.")
# RabbitMQ 연결 파라미터
connection_params = pika.ConnectionParameters(
host='localhost',
heartbeat=60,
blocked_connection_timeout=300
)
# 모니터링되는 소비자 시작
consumers = []
for queue_name in ['orders', 'notifications', 'emails']:
consumer = MonitoredConsumer(connection_params, queue_name)
consumer.start()
consumers.append(consumer)
# 애플리케이션 실행 유지
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("애플리케이션 종료 중...")
for consumer in consumers:
consumer.stop()
if __name__ == "__main__":
main()
|
경보 및 자동화 대응#
메시징 시스템에서 발생할 수 있는 문제에 대한 경보와 자동화된 대응 전략은 아래와 같다.
- 큐 적체(Queue Buildup): 큐 깊이가 임계값을 초과할 경우 경보 발생
- 데드 레터 증가(Dead-Letter Growth): 데드 레터 큐에 메시지가 축적될 경우 경보
- 소비자 장애(Consumer Failure): 활성 소비자 수가 감소할 경우 자동 복구
- 처리 지연(Processing Delays): 메시지 처리 지연이 임계값을 초과할 경우 경보
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
| // Node.js에서 RabbitMQ 모니터링 및 자동화된 대응 예시
const amqp = require('amqplib');
const axios = require('axios');
const cron = require('node-cron');
class MessageQueueMonitor {
constructor(config) {
this.config = config;
this.connection = null;
this.channel = null;
this.queuesStatus = {};
this.alerts = [];
}
async connect() {
try {
this.connection = await amqp.connect(this.config.rabbitmq.url);
this.channel = await this.connection.createChannel();
console.log('RabbitMQ에 연결되었습니다.');
return true;
} catch (error) {
console.error('RabbitMQ 연결 오류:', error.message);
return false;
}
}
async monitorQueues() {
if (!this.channel) {
const connected = await this.connect();
if (!connected) {
console.error('모니터링을 시작할 수 없습니다: RabbitMQ에 연결되지 않음');
return;
}
}
try {
// 모든 큐 상태 확인
for (const queueName of this.config.queues) {
try {
const queueInfo = await this.channel.assertQueue(queueName, { durable: true });
// 이전 상태 저장
const prevStatus = this.queuesStatus[queueName] || {};
// 현재 상태 업데이트
this.queuesStatus[queueName] = {
name: queueName,
messageCount: queueInfo.messageCount,
consumerCount: queueInfo.consumerCount,
timestamp: new Date(),
previousCount: prevStatus.messageCount || 0,
growth: prevStatus.messageCount !== undefined ?
queueInfo.messageCount - prevStatus.messageCount : 0
};
// 경보 확인
this.checkAlerts(queueName);
console.log(`큐 ${queueName}: 메시지=${queueInfo.messageCount}, 소비자=${queueInfo.consumerCount}`);
} catch (error) {
console.error(`큐 ${queueName} 모니터링 오류:`, error.message);
}
}
// 데드 레터 큐 확인
for (const dlqName of this.config.deadLetterQueues) {
try {
const dlqInfo = await this.channel.assertQueue(dlqName, { durable: true });
this.queuesStatus[dlqName] = {
name: dlqName,
messageCount: dlqInfo.messageCount,
consumerCount: dlqInfo.consumerCount,
timestamp: new Date(),
isDLQ: true
};
// 데드 레터 큐 경보 확인
if (dlqInfo.messageCount > this.config.alerts.deadLetterThreshold) {
this.triggerAlert({
type: 'dead_letter_buildup',
queue: dlqName,
messageCount: dlqInfo.messageCount,
threshold: this.config.alerts.deadLetterThreshold,
timestamp: new Date()
});
}
console.log(`데드 레터 큐 ${dlqName}: 메시지=${dlqInfo.messageCount}`);
} catch (error) {
console.error(`데드 레터 큐 ${dlqName} 모니터링 오류:`, error.message);
}
}
// 메트릭 보고
this.reportMetrics();
} catch (error) {
console.error('큐 모니터링 오류:', error.message);
// 연결 재시도
this.connection = null;
this.channel = null;
}
}
checkAlerts(queueName) {
const status = this.queuesStatus[queueName];
// 큐 적체 경보
if (status.messageCount > this.config.alerts.queueSizeThreshold) {
this.triggerAlert({
type: 'queue_buildup',
queue: queueName,
messageCount: status.messageCount,
threshold: this.config.alerts.queueSizeThreshold,
timestamp: new Date()
});
// 자동화된 대응 실행
if (this.config.autoRemediation.enabled) {
this.scaleUpConsumers(queueName);
}
}
// 메시지 증가율 경보
if (status.growth > this.config.alerts.growthRateThreshold) {
this.triggerAlert({
type: 'queue_growth_rate',
queue: queueName,
growth: status.growth,
threshold: this.config.alerts.growthRateThreshold,
timestamp: new Date()
});
}
// 소비자 부족 경보
if (status.consumerCount < this.config.alerts.minConsumers) {
this.triggerAlert({
type: 'consumer_shortage',
queue: queueName,
consumerCount: status.consumerCount,
minRequired: this.config.alerts.minConsumers,
timestamp: new Date()
});
// 자동화된 대응 실행
if (this.config.autoRemediation.enabled) {
this.restartConsumers(queueName);
}
}
}
triggerAlert(alert) {
console.warn(`경보 발생: ${alert.type} - ${alert.queue}`);
this.alerts.push(alert);
// 경보 중복 방지 (같은 유형의 경보가 5분 내에 발생한 경우)
const recentSimilarAlert = this.alerts
.filter(a => a.type === alert.type && a.queue === alert.queue)
.filter(a => (new Date() - a.timestamp) < 5 * 60 * 1000)
.length > 1;
if (!recentSimilarAlert) {
// 알림 전송 (Slack, 이메일 등)
this.sendNotification(alert);
}
}
async sendNotification(alert) {
// Slack 웹훅 알림 전송 예시
try {
if (this.config.notifications.slack.enabled) {
const severity = alert.type.includes('buildup') || alert.type.includes('shortage')
? '🔴 심각' : '🟠 경고';
await axios.post(this.config.notifications.slack.webhookUrl, {
text: `${severity}: ${alert.type}`,
blocks: [
{
type: 'section',
text: {
type: 'mrkdwn',
text: `*${severity}: ${alert.type}*`
}
},
{
type: 'section',
fields: [
{
type: 'mrkdwn',
text: `*큐:*\n${alert.queue}`
},
{
type: 'mrkdwn',
text: `*메시지 수:*\n${alert.messageCount || 'N/A'}`
},
{
type: 'mrkdwn',
text: `*임계값:*\n${alert.threshold || alert.minRequired || 'N/A'}`
},
{
type: 'mrkdwn',
text: `*시간:*\n${alert.timestamp.toISOString()}`
}
]
}
]
});
console.log(`Slack 알림 전송됨: ${alert.type} - ${alert.queue}`);
}
} catch (error) {
console.error('알림 전송 오류:', error.message);
}
}
async reportMetrics() {
// 메트릭 보고 (Prometheus, CloudWatch 등)
try {
if (this.config.metrics.enabled) {
const metrics = Object.values(this.queuesStatus).map(status => ({
queueName: status.name,
messageCount: status.messageCount,
consumerCount: status.consumerCount,
isDLQ: status.isDLQ || false,
timestamp: status.timestamp.getTime()
}));
// 메트릭 API로 전송
await axios.post(this.config.metrics.endpoint, { metrics });
console.log('메트릭 보고 완료');
}
} catch (error) {
console.error('메트릭 보고 오류:', error.message);
}
}
async scaleUpConsumers(queueName) {
console.log(`큐 ${queueName}에 대한 소비자 확장 시작`);
try {
// Kubernetes API 호출 예시 (실제 환경에 맞게 조정 필요)
if (this.config.autoRemediation.kubernetes.enabled) {
const k8sApi = this.config.autoRemediation.kubernetes.endpoint;
const deployment = this.config.autoRemediation.kubernetes.deployments[queueName];
if (deployment) {
await axios.patch(
`${k8sApi}/namespaces/${deployment.namespace}/deployments/${deployment.name}/scale`,
{
spec: {
replicas: deployment.maxReplicas || 5
}
},
{
headers: {
'Authorization': `Bearer ${this.config.autoRemediation.kubernetes.token}`,
'Content-Type': 'application/strategic-merge-patch+json'
}
}
);
console.log(`${queueName} 소비자 수 확장 완료`);
}
}
} catch (error) {
console.error(`소비자 확장 오류:`, error.message);
}
}
async restartConsumers(queueName) {
console.log(`큐 ${queueName}에 대한 소비자 재시작 시작`);
try {
// 소비자 서비스 재시작 로직 구현
// (실제 환경에 맞는 API 호출 또는 스크립트 실행)
} catch (error) {
console.error(`소비자 재시작 오류:`, error.message);
}
}
async purgeDeadLetterQueue(dlqName) {
console.log(`데드 레터 큐 ${dlqName} 비우기 시작`);
try {
if (!this.channel) {
await this.connect();
}
// 각 메시지를 가져와서 로그로 기록 후 처리
let emptyQueue = false;
let processedCount = 0;
while (!emptyQueue) {
const message = await this.channel.get(dlqName, { noAck: false });
if (message) {
// 메시지 로깅
console.log(`DLQ 메시지: ${message.content.toString()}`);
// 메시지 확인
this.channel.ack(message);
processedCount++;
} else {
emptyQueue = true;
}
}
console.log(`데드 레터 큐 ${dlqName}에서 ${processedCount}개 메시지 처리됨`);
} catch (error) {
console.error(`데드 레터 큐 비우기 오류:`, error.message);
}
}
async start() {
// 초기 연결
await this.connect();
// 정기적인 모니터링 스케줄링
cron.schedule(this.config.monitoringInterval, () => {
this.monitorQueues();
});
// 데드 레터 큐 정기 점검
if (this.config.deadLetterCleanup.enabled) {
cron.schedule(this.config.deadLetterCleanup.schedule, async () => {
for (const dlqName of this.config.deadLetterQueues) {
await this.purgeDeadLetterQueue(dlqName);
}
});
}
console.log('메시지 큐 모니터링 시작됨');
}
async stop() {
if (this.connection) {
await this.connection.close();
}
console.log('메시지 큐 모니터링 중지됨');
}
}
// 설정 예시
const config = {
rabbitmq: {
url: 'amqp://guest:guest@localhost'
},
queues: ['orders', 'notifications', 'emails', 'payments'],
deadLetterQueues: ['orders.failed', 'notifications.failed', 'emails.failed'],
monitoringInterval: '*/1 * * * *', // 매 분마다
alerts: {
queueSizeThreshold: 1000,
growthRateThreshold: 100,
minConsumers: 2,
deadLetterThreshold: 10
},
autoRemediation: {
enabled: true,
kubernetes: {
enabled: true,
endpoint: 'https://kubernetes.default.svc',
token: 'k8s-token',
deployments: {
orders: { namespace: 'default', name: 'order-consumer', maxReplicas: 5 },
notifications: { namespace: 'default', name: 'notification-consumer', maxReplicas: 3 }
}
}
},
deadLetterCleanup: {
enabled: true,
schedule: '0 * * * *' // 매 시간마다
},
notifications: {
slack: {
enabled: true,
webhookUrl: 'https://hooks.slack.com/services/XXX/YYY/ZZZ'
}
},
metrics: {
enabled: true,
endpoint: 'http://metrics-collector:8080/api/metrics'
}
};
// 모니터링 시작
const monitor = new MessageQueueMonitor(config);
monitor.start().catch(console.error);
// 정상 종료 처리
process.on('SIGINT', async () => {
console.log('프로그램 종료 중...');
await monitor.stop();
process.exit(0);
});
|
메시징 큐의 고급 응용 패턴#
명령 쿼리 책임 분리(CQRS)와 이벤트 소싱(Event Sourcing)#
메시징 큐는 CQRS와 이벤트 소싱 패턴을 구현하는 데 핵심적인 역할을 한다. 이벤트를 저장하고 전파하여 시스템의 상태 변화를 추적한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
| // TypeScript를 사용한 CQRS 및 이벤트 소싱 예시
import { EventBus, CommandBus } from 'some-event-library';
// 명령(Command) 정의
interface CreateOrderCommand {
readonly type: 'CreateOrder';
readonly payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
shippingAddress: Address;
};
}
// 이벤트(Event) 정의
interface OrderCreatedEvent {
readonly type: 'OrderCreated';
readonly payload: {
orderId: string;
customerId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
totalAmount: number;
shippingAddress: Address;
createdAt: string;
};
}
// 명령 핸들러
class CreateOrderHandler {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventBus: EventBus
) {}
async handle(command: CreateOrderCommand): Promise<string> {
// 비즈니스 로직 및 유효성 검사
const { customerId, items, shippingAddress } = command.payload;
// 새 주문 생성
const order = new Order(
generateId(),
customerId,
items,
calculateTotalAmount(items),
shippingAddress,
new Date().toISOString()
);
// 주문 저장
await this.orderRepository.save(order);
// 이벤트 발행
const event: OrderCreatedEvent = {
type: 'OrderCreated',
payload: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
shippingAddress: order.shippingAddress,
createdAt: order.createdAt
}
};
await this.eventBus.publish(event);
return order.id;
}
}
// 이벤트 핸들러 (읽기 모델 업데이트)
class OrderCreatedEventHandler {
constructor(
private readonly orderReadModel: OrderReadModel
) {}
async handle(event: OrderCreatedEvent): Promise<void> {
const { orderId, customerId, items, totalAmount, shippingAddress, createdAt } = event.payload;
// 읽기 모델 업데이트
await this.orderReadModel.create({
id: orderId,
customerId,
items: items.map(item => ({
productId: item.productId,
quantity: item.quantity,
price: item.price,
subtotal: item.price * item.quantity
})),
totalAmount,
shippingAddress,
status: 'created',
createdAt,
updatedAt: createdAt
});
}
}
// API 컨트롤러
class OrderController {
constructor(
private readonly commandBus: CommandBus,
private readonly orderReadModel: OrderReadModel
) {}
// 명령 처리 엔드포인트
async createOrder(req, res) {
try {
const command: CreateOrderCommand = {
type: 'CreateOrder',
payload: req.body
};
const orderId = await this.commandBus.execute(command);
res.status(201).json({ orderId });
} catch (error) {
res.status(400).json({ error: error.message });
}
}
// 쿼리 엔드포인트
async getOrder(req, res) {
try {
const orderId = req.params.id;
const order = await this.orderReadModel.findById(orderId);
if (!order) {
return res.status(404).json({ error: 'Order not found' });
}
res.json(order);
} catch (error) {
res.status(500).json({ error: error.message });
}
}
}
|
서킷 브레이커(Circuit Breaker) 패턴#
메시징 큐와 함께 서킷 브레이커 패턴을 사용하면 다운스트림 서비스의 장애로부터 시스템을 보호할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| // Java Spring Boot에서 서킷 브레이커와 메시징 큐 통합 예시
@Service
public class ResilientMessageService {
private final RabbitTemplate rabbitTemplate;
private final CircuitBreakerFactory circuitBreakerFactory;
@Autowired
public ResilientMessageService(
RabbitTemplate rabbitTemplate,
CircuitBreakerFactory circuitBreakerFactory) {
this.rabbitTemplate = rabbitTemplate;
this.circuitBreakerFactory = circuitBreakerFactory;
}
public void sendMessageWithCircuitBreaker(String exchange, String routingKey, Object message) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("messagingCircuitBreaker");
circuitBreaker.run(
// 정상 경로: 메시지 전송 시도
() -> {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
},
// 폴백 경로: 브로커 장애 시 대체 처리
throwable -> {
handleMessageSendFailure(exchange, routingKey, message, throwable);
return false;
}
);
}
private void handleMessageSendFailure(String exchange, String routingKey, Object message, Throwable throwable) {
log.error("메시지 전송 실패: {}", throwable.getMessage());
// 로컬 저장소에 메시지 임시 저장
saveMessageToLocalStorage(exchange, routingKey, message);
// 알림 발송
notifyOperationsTeam("메시징 서비스 장애",
String.format("메시지 전송 실패: 교환기=%s, 라우팅 키=%s, 오류=%s",
exchange, routingKey, throwable.getMessage()));
}
@Scheduled(fixedDelay = 60000) // 1분마다 실행
public void retryFailedMessages() {
List<StoredMessage> failedMessages = getFailedMessagesFromLocalStorage();
if (failedMessages.isEmpty()) {
return;
}
log.info("실패한 메시지 {} 개 재시도 중", failedMessages.size());
for (StoredMessage storedMessage : failedMessages) {
try {
// 서킷 브레이커 상태 확인
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("messagingCircuitBreaker");
if (circuitBreaker.getState() == CircuitBreaker.State.CLOSED) {
// 메시지 재전송 시도
rabbitTemplate.convertAndSend(
storedMessage.getExchange(),
storedMessage.getRoutingKey(),
storedMessage.getMessage()
);
// 성공적으로 전송된 메시지 제거
removeMessageFromLocalStorage(storedMessage.getId());
log.info("메시지 ID {} 재전송 성공", storedMessage.getId());
} else {
log.warn("서킷 브레이커가 열려 있어 재시도 건너뜀 (상태: {})", circuitBreaker.getState());
break; // 브레이커가 열려 있으면 더 이상 시도하지 않음
}
} catch (Exception e) {
log.error("메시지 ID {} 재전송 실패: {}", storedMessage.getId(), e.getMessage());
// 재시도 횟수 및 마지막 시도 시간 업데이트
updateRetryStatus(storedMessage.getId());
}
}
}
}
|
백 프레셔(Back Pressure) 처리#
시스템이 처리할 수 있는 것보다 더 많은 메시지가 유입될 때 백 프레셔 메커니즘을 통해 과부하를 방지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
| // Kotlin과 Reactor를 사용한 백 프레셔 구현 예시
import org.springframework.stereotype.Service
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.annotation.RabbitListener
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
@Service
class BackPressureMessageProcessor(
private val rabbitTemplate: RabbitTemplate
) {
private val messageCount = AtomicInteger(0)
private val processingSink = Sinks.many().multicast().onBackpressureBuffer<Message>()
private val processingFlux = processingSink.asFlux()
init {
// 초당 최대 100개 메시지만 처리하도록 설정
processingFlux
.publishOn(Schedulers.boundedElastic())
.window(Duration.ofSeconds(1), 100)
.flatMap { window ->
window.doOnNext { msg ->
processMessage(msg)
}
}
.subscribe()
// 현재 처리 상태 모니터링 및 보고
Flux.interval(Duration.ofSeconds(10))
.subscribe {
val count = messageCount.getAndSet(0)
log.info("지난 10초 동안 처리된 메시지: $count (초당 ${count / 10.0})")
}
}
@RabbitListener(queues = ["high-volume-queue"])
fun receiveMessage(message: Message) {
val emitResult = processingSink.tryEmitNext(message)
if (emitResult.isFailure) {
// 백 프레셔 적용 - 큐로 다시 보내기
log.warn("처리 용량 초과, 메시지를 지연 큐로 리다이렉션")
rabbitTemplate.convertAndSend(
"delayed.exchange",
"delayed.routing",
message,
messagePostProcessor -> {
messagePostProcessor.messageProperties.setHeader("x-delay", 30000) // 30초 지연
messagePostProcessor
}
)
}
}
private fun processMessage(message: Message) {
try {
// 실제 메시지 처리 로직
log.info("메시지 처리 중: ${message.id}")
// 처리 시간 시뮬레이션
Thread.sleep((10.).random().toLong())
messageCount.incrementAndGet()
} catch (e: Exception) {
log.error("메시지 처리 오류: ${e.message}", e)
}
}
}
|
분산 추적(Distributed Tracing)#
여러 서비스와 큐를 통과하는 메시지의 흐름을 추적하여 시스템 성능 및 오류를 모니터링한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
| # Python FastAPI와 OpenTelemetry를 사용한 분산 추적 예시
from fastapi import FastAPI, Depends, Header, Request
from typing import Optional, Dict, Any
import json
import uuid
import aio_pika
import asyncio
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.context import Context
from opentelemetry.propagate import extract, inject, get_global_TextMapPropagator
import logging
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# OpenTelemetry 설정
resource = Resource(attributes={
SERVICE_NAME: "order-service"
})
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.set_tracer_provider(TracerProvider(resource=resource))
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
tracer = trace.get_tracer(__name__)
# FastAPI 앱 생성
app = FastAPI(title="주문 서비스")
# FastAPI 계측
FastAPIInstrumentor.instrument_app(app)
# AioPika 계측
AioPikaInstrumentor().instrument()
# RabbitMQ 연결 설정
async def get_rabbitmq_connection():
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
# 추적 컨텍스트 추출 함수
def extract_tracing_context(headers: Dict[str, str]) -> Context:
return extract(get_global_TextMapPropagator(), headers)
# 메시지 발행 함수
async def publish_message(
exchange_name: str,
routing_key: str,
message: Dict[str, Any],
parent_span: trace.Span
):
connection = await get_rabbitmq_connection()
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.TOPIC,
durable=True
)
# 추적 컨텍스트를 메시지 헤더에 주입
message_headers = {}
inject(get_global_TextMapPropagator(), message_headers, parent_span.get_span_context())
# 추적 ID와 메시지 ID 추가
message_id = str(uuid.uuid4())
message_with_context = {
"message_id": message_id,
"trace_id": format(parent_span.get_span_context().trace_id, "032x"),
"span_id": format(parent_span.get_span_context().span_id, "016x"),
"data": message
}
with tracer.start_as_current_span(
f"publish-{routing_key}",
context=trace.set_span_in_context(parent_span),
kind=trace.SpanKind.PRODUCER
) as publish_span:
publish_span.set_attribute("messaging.system", "rabbitmq")
publish_span.set_attribute("messaging.destination", exchange_name)
publish_span.set_attribute("messaging.destination_kind", "exchange")
publish_span.set_attribute("messaging.rabbitmq.routing_key", routing_key)
publish_span.set_attribute("messaging.message_id", message_id)
# 메시지 발행
await exchange.publish(
aio_pika.Message(
body=json.dumps(message_with_context).encode(),
content_type="application/json",
headers=message_headers,
message_id=message_id,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key=routing_key
)
logger.info(f"메시지 발행됨: {routing_key}, ID: {message_id}")
return message_id
# 주문 생성 API 엔드포인트
@app.post("/api/orders", status_code=202)
async def create_order(
order: Dict[str, Any],
request: Request,
trace_parent: Optional[str] = Header(None)
):
# 현재 트레이스 컨텍스트 가져오기
current_span = trace.get_current_span()
# 주문 ID 생성
order_id = str(uuid.uuid4())
order["id"] = order_id
# 주문 생성 처리
with tracer.start_as_current_span(
"process-order",
context=trace.set_span_in_context(current_span),
) as process_span:
process_span.set_attribute("order.id", order_id)
process_span.set_attribute("order.customer_id", order.get("customer_id", "unknown"))
process_span.set_attribute("order.item_count", len(order.get("items", [])))
# 주문 상태 저장 로직 (실제로는 DB에 저장)
logger.info(f"주문 {order_id} 생성 중")
# 비동기 처리를 위한 메시지 발행
message_id = await publish_message(
"orders",
"orders.created",
order,
process_span
)
return {
"order_id": order_id,
"status": "processing",
"message": "주문이 처리 중입니다",
"trace_id": format(current_span.get_span_context().trace_id, "032x")
}
# 메시지 소비자 설정
async def setup_consumer():
connection = await get_rabbitmq_connection()
# 채널 생성
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
# 교환기 및 큐 선언
orders_exchange = await channel.declare_exchange(
"orders",
aio_pika.ExchangeType.TOPIC,
durable=True
)
order_processing_queue = await channel.declare_queue(
"order_processing",
durable=True
)
await order_processing_queue.bind(orders_exchange, "orders.created")
# 메시지 처리 콜백
async def process_order_message(message: aio_pika.IncomingMessage):
async with message.process():
# 트레이싱 컨텍스트 추출
trace_context = extract_tracing_context(dict(message.headers) if message.headers else {})
# 메시지 내용 파싱
body = json.loads(message.body.decode())
order_data = body.get("data", {})
message_id = body.get("message_id", "unknown")
trace_id = body.get("trace_id", "unknown")
logger.info(f"주문 메시지 수신: {message_id}, 트레이스 ID: {trace_id}")
# 메시지 처리 스팬 생성
with tracer.start_as_current_span(
"consume-order",
context=trace_context,
kind=trace.SpanKind.CONSUMER
) as consume_span:
consume_span.set_attribute("messaging.system", "rabbitmq")
consume_span.set_attribute("messaging.operation", "process")
consume_span.set_attribute("messaging.message_id", message_id)
consume_span.set_attribute("messaging.rabbitmq.routing_key", message.routing_key)
consume_span.set_attribute("order.id", order_data.get("id", "unknown"))
try:
# 주문 처리 로직
logger.info(f"주문 {order_data.get('id')} 처리 중")
# 처리 시간 시뮬레이션
await asyncio.sleep(0.5)
# 재고 확인 스팬
with tracer.start_as_current_span("check-inventory") as inventory_span:
inventory_span.set_attribute("order.item_count", len(order_data.get("items", [])))
await asyncio.sleep(0.3) # 재고 확인 시뮬레이션
# 결제 처리 스팬
with tracer.start_as_current_span("process-payment") as payment_span:
payment_span.set_attribute("order.total", order_data.get("total_amount", 0))
await asyncio.sleep(0.4) # 결제 처리 시뮬레이션
# 주문 완료 이벤트 발행
await publish_message(
"orders",
"orders.processed",
{
"order_id": order_data.get("id"),
"status": "completed",
"processed_at": str(asyncio.get_event_loop().time())
},
consume_span
)
logger.info(f"주문 {order_data.get('id')} 처리 완료")
except Exception as e:
error_msg = f"주문 처리 오류: {str(e)}"
logger.error(error_msg)
consume_span.record_exception(e)
consume_span.set_status(trace.StatusCode.ERROR, error_msg)
# 주문 실패 이벤트 발행
await publish_message(
"orders",
"orders.failed",
{
"order_id": order_data.get("id"),
"status": "failed",
"error": str(e),
"failed_at": str(asyncio.get_event_loop().time())
},
consume_span
)
# 소비자 시작
await order_processing_queue.consume(process_order_message)
logger.info("주문 처리 소비자가 시작되었습니다")
# 애플리케이션 시작 이벤트에 소비자 설정 추가
@app.on_event("startup")
async def startup_event():
# 백그라운드 작업으로 소비자 설정
asyncio.create_task(setup_consumer())
logger.info("애플리케이션이 시작되었습니다")
# 애플리케이션 종료 이벤트
@app.on_event("shutdown")
async def shutdown_event():
logger.info("애플리케이션이 종료됩니다")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
|
메시징 큐 아키텍처 패턴#
데이터 스트리밍 아키텍처#
메시징 큐를 사용하여 대량의 데이터를 연속적으로 처리하고 실시간 분석을 지원하는 아키텍처이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
| // Node.js에서 Kafka를 사용한 데이터 스트리밍 아키텍처 예시
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');
const express = require('express');
const cors = require('cors');
// Kafka 설정
const kafka = new Kafka({
clientId: 'data-stream-processor',
brokers: ['localhost:9092']
});
// Redis 설정
const redisClient = new Redis();
// Express 설정
const app = express();
app.use(cors());
app.use(express.json());
// 스트림 처리 클래스
class StreamProcessor {
constructor() {
this.producer = kafka.producer();
this.consumer = kafka.consumer({ groupId: 'stream-processor-group' });
this.metricConsumer = kafka.consumer({ groupId: 'metrics-processor-group' });
this.eventCounts = {};
this.realtimeStats = {};
}
async start() {
// 생산자 및 소비자 연결
await this.producer.connect();
await this.consumer.connect();
await this.metricConsumer.connect();
// 토픽 구독
await this.consumer.subscribe({ topics: ['data-events'], fromBeginning: false });
await this.metricConsumer.subscribe({ topics: ['metrics'], fromBeginning: false });
// 데이터 이벤트 처리
await this.consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat }) => {
try {
const eventData = JSON.parse(message.value.toString());
const eventType = eventData.type;
console.log(`데이터 이벤트 수신: ${eventType}, ID: ${eventData.id}`);
// 이벤트 카운트 업데이트
this.eventCounts[eventType] = (this.eventCounts[eventType] || 0) + 1;
// 이벤트 타입에 따른 처리
await this.processEvent(eventData);
// 주기적 하트비트 전송
await heartbeat();
} catch (error) {
console.error('이벤트 처리 오류:', error);
}
}
});
// 메트릭 처리
await this.metricConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const metricData = JSON.parse(message.value.toString());
// 지표 업데이트
this.updateMetrics(metricData);
// Redis에 실시간 지표 저장
await this.storeMetricsInRedis(metricData);
} catch (error) {
console.error('메트릭 처리 오류:', error);
}
}
});
// 주기적인 통계 보고
setInterval(() => this.reportStats(), 60000);
console.log('스트림 프로세서가 시작되었습니다.');
}
async processEvent(eventData) {
// 이벤트 타입에 따른 처리 로직
switch (eventData.type) {
case 'user_action':
await this.processUserAction(eventData);
break;
case 'system_metric':
await this.processSystemMetric(eventData);
break;
case 'transaction':
await this.processTransaction(eventData);
break;
default:
console.log(`알 수 없는 이벤트 타입: ${eventData.type}`);
}
// 메트릭 토픽으로 집계 데이터 전송
await this.sendMetrics(eventData);
}
async processUserAction(eventData) {
// 사용자 행동 이벤트 처리
console.log(`사용자 행동 처리: ${eventData.userId}, 액션: ${eventData.action}`);
// 사용자 행동 집계
const key = `user:${eventData.userId}:actions`;
await redisClient.hincrby(key, eventData.action, 1);
await redisClient.expire(key, 86400); // 24시간 유지
}
async processSystemMetric(eventData) {
// 시스템 메트릭 처리
console.log(`시스템 메트릭 처리: ${eventData.system}, 지표: ${eventData.metric}`);
// 시스템 메트릭 집계
const key = `system:${eventData.system}:metrics`;
await redisClient.hset(key, eventData.metric, eventData.value);
}
async processTransaction(eventData) {
// 트랜잭션 이벤트 처리
console.log(`트랜잭션 처리: ${eventData.transactionId}, 금액: ${eventData.amount}`);
// 트랜잭션 집계
const dayKey = new Date().toISOString().split('T')[0];
await redisClient.hincrby(`transactions:${dayKey}:count`, eventData.type, 1);
await redisClient.hincrby(`transactions:${dayKey}:amount`, eventData.type, eventData.amount);
}
async sendMetrics(eventData) {
// 메트릭 데이터 생성
const metricData = {
timestamp: Date.now(),
source: eventData.source || 'unknown',
eventType: eventData.type,
dimensions: {
// 이벤트 타입별 차원 추출
…this.extractDimensions(eventData)
},
metrics: {
// 이벤트 타입별 지표 추출
…this.extractMetrics(eventData)
}
};
// 메트릭 토픽으로 전송
await this.producer.send({
topic: 'metrics',
messages: [
{ value: JSON.stringify(metricData) }
]
});
}
extractDimensions(eventData) {
// 이벤트에서 차원 정보 추출
const dimensions = {};
switch (eventData.type) {
case 'user_action':
dimensions.userId = eventData.userId;
dimensions.action = eventData.action;
dimensions.platform = eventData.platform;
break;
case 'system_metric':
dimensions.system = eventData.system;
dimensions.metric = eventData.metric;
dimensions.instance = eventData.instance;
break;
case 'transaction':
dimensions.transactionType = eventData.transactionType;
dimensions.paymentMethod = eventData.paymentMethod;
dimensions.country = eventData.country;
break;
}
return dimensions;
}
extractMetrics(eventData) {
// 이벤트에서 지표 정보 추출
const metrics = {};
switch (eventData.type) {
case 'user_action':
metrics.duration = eventData.duration || 0;
metrics.count = 1;
break;
case 'system_metric':
metrics.value = eventData.value;
break;
case 'transaction':
metrics.amount = eventData.amount;
metrics.count = 1;
break;
}
return metrics;
}
updateMetrics(metricData) {
const { eventType, dimensions, metrics, timestamp } = metricData;
const timeWindow = Math.floor(timestamp / 60000) * 60000; // 1분 윈도우
// 시간 윈도우별 지표 집계
if (!this.realtimeStats[timeWindow]) {
this.realtimeStats[timeWindow] = {};
}
if (!this.realtimeStats[timeWindow][eventType]) {
this.realtimeStats[timeWindow][eventType] = {
count: 0,
dimensions: {},
metrics: {}
};
}
const stats = this.realtimeStats[timeWindow][eventType];
stats.count += 1;
// 차원별 집계
for (const [dimName, dimValue] of Object.entries(dimensions)) {
if (!stats.dimensions[dimName]) {
stats.dimensions[dimName] = {};
}
if (!stats.dimensions[dimName][dimValue]) {
stats.dimensions[dimName][dimValue] = 0;
}
stats.dimensions[dimName][dimValue] += 1;
}
// 지표 집계
for (const [metricName, metricValue] of Object.entries(metrics)) {
if (!stats.metrics[metricName]) {
stats.metrics[metricName] = {
sum: 0,
count: 0,
min: Number.MAX_VALUE,
max: Number.MIN_VALUE
};
}
const metricStats = stats.metrics[metricName];
metricStats.sum += metricValue;
metricStats.count += 1;
metricStats.min = Math.min(metricStats.min, metricValue);
metricStats.max = Math.max(metricStats.max, metricValue);
}
// 오래된 윈도우 정리 (30분 이상 지난 데이터)
const cutoffTime = Date.now() - 30 * 60000;
for (const windowTime of Object.keys(this.realtimeStats)) {
if (parseInt(windowTime) < cutoffTime) {
delete this.realtimeStats[windowTime];
}
}
}
async storeMetricsInRedis(metricData) {
const { eventType, dimensions, metrics, timestamp } = metricData;
const timeWindow = Math.floor(timestamp / 60000); // 1분 윈도우
// Redis에 지표 저장
const metricKey = `metrics:${timeWindow}:${eventType}`;
// 지표 데이터 저장
for (const [metricName, metricValue] of Object.entries(metrics)) {
await redisClient.hincrby(`${metricKey}:sum`, metricName, metricValue);
await redisClient.hincrby(`${metricKey}:count`, metricName, 1);
}
// 차원 데이터 저장
for (const [dimName, dimValue] of Object.entries(dimensions)) {
await redisClient.hincrby(`${metricKey}:dimension:${dimName}`, dimValue, 1);
}
// 만료 시간 설정 (24시간)
await redisClient.expire(metricKey, 86400);
}
reportStats() {
console.log('\n--- 스트림 프로세서 통계 ---');
console.log('이벤트 카운트:', this.eventCounts);
// 최근 5분 통계 계산
const now = Date.now();
const recentWindows = Object.keys(this.realtimeStats)
.filter(time => now - parseInt(time) <= 5 * 60000)
.sort();
if (recentWindows.length > 0) {
console.log('\n최근 5분 통계:');
for (const window of recentWindows) {
const windowTime = new Date(parseInt(window)).toISOString();
console.log(`\n시간 윈도우: ${windowTime}`);
for (const [eventType, stats] of Object.entries(this.realtimeStats[window])) {
console.log(` ${eventType}: ${stats.count}건`);
// 주요 지표 출력
for (const [metricName, metricStats] of Object.entries(stats.metrics)) {
const avg = metricStats.sum / metricStats.count;
console.log(` ${metricName}: avg=${avg.toFixed(2)}, min=${metricStats.min}, max=${metricStats.max}`);
}
}
}
}
console.log('\n------------------------\n');
}
async stop() {
await this.producer.disconnect();
await this.consumer.disconnect();
await this.metricConsumer.disconnect();
console.log('스트림 프로세서가 중지되었습니다.');
}
}
// API 설정
app.get('/api/stats/realtime', async (req, res) => {
try {
// 최근 5분 통계 조회
const now = Date.now();
const fiveMinutesAgo = now - 5 * 60000;
// Redis에서 최근 통계 데이터 조회
const timeWindows = [];
for (let t = Math.floor(fiveMinutesAgo / 60000); t <= Math.floor(now / 60000); t++) {
timeWindows.push(t);
}
const result = {
timeWindows: [],
eventTypes: {}
};
for (const window of timeWindows) {
const windowTime = new Date(window * 60000).toISOString();
result.timeWindows.push(windowTime);
// 각 이벤트 타입별 데이터 조회
const eventTypes = ['user_action', 'system_metric', 'transaction'];
for (const eventType of eventTypes) {
if (!result.eventTypes[eventType]) {
result.eventTypes[eventType] = {
counts: [],
metrics: {}
};
}
const metricKey = `metrics:${window}:${eventType}`;
// 이벤트 횟수 조회
const countData = await redisClient.hgetall(`${metricKey}:count`);
let totalCount = 0;
for (const count of Object.values(countData)) {
totalCount += parseInt(count || 0);
}
result.eventTypes[eventType].counts.push(totalCount);
// 주요 지표 조회
const sumData = await redisClient.hgetall(`${metricKey}:sum`);
for (const [metricName, sum] of Object.entries(sumData)) {
if (!result.eventTypes[eventType].metrics[metricName]) {
result.eventTypes[eventType].metrics[metricName] = {
sums: []
};
}
result.eventTypes[eventType].metrics[metricName].sums.push(parseInt(sum || 0));
}
}
}
res.json(result);
} catch (error) {
console.error('통계 조회 오류:', error);
res.status(500).json({ error: error.message });
}
});
// 서버 시작
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`API 서버가 포트 ${PORT}에서 실행 중입니다.`);
});
// 스트림 프로세서 시작
const processor = new StreamProcessor();
processor.start().catch(console.error);
// 정상 종료 처리
process.on('SIGINT', async () => {
console.log('애플리케이션 종료 중…');
await processor.stop();
await redisClient.quit();
process.exit(0);
});
|
명령 및 이벤트 기반 아키텍처#
명령(Command)과 이벤트(Event)를 통해 시스템 컴포넌트 간의 통신을 조정하는 아키텍처 패턴이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
| // C#을 사용한 명령 및 이벤트 기반 아키텍처 예시
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;
using System.Text;
// 명령 인터페이스
public interface ICommand
{
Guid Id { get; }
string CommandType { get; }
DateTime Timestamp { get; }
}
// 이벤트 인터페이스
public interface IEvent
{
Guid Id { get; }
string EventType { get; }
DateTime Timestamp { get; }
Guid? CorrelationId { get; }
}
// 명령 핸들러 인터페이스
public interface ICommandHandler<TCommand> where TCommand : ICommand
{
Task<List<IEvent>> HandleAsync(TCommand command);
}
// 이벤트 핸들러 인터페이스
public interface IEventHandler<TEvent> where TEvent : IEvent
{
Task HandleAsync(TEvent @event);
}
// 명령 버스
public class CommandBus
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogger<CommandBus> _logger;
private readonly string _exchangeName = "commands";
public CommandBus(IConnection connection, ILogger<CommandBus> logger)
{
_connection = connection;
_channel = connection.CreateModel();
_logger = logger;
// 명령 교환기 설정
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Direct, durable: true);
}
public async Task SendAsync<TCommand>(TCommand command) where TCommand : ICommand
{
var commandType = command.GetType().Name;
var routingKey = commandType;
// 명령 직렬화
var message = JsonConvert.SerializeObject(command);
var body = Encoding.UTF8.GetBytes(message);
// 기본 속성 설정
var properties = _channel.CreateBasicProperties();
properties.MessageId = command.Id.ToString();
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
properties.Headers = new Dictionary<string, object>
{
{ "command_type", commandType }
};
properties.Persistent = true;
// 명령 전송
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
_logger.LogInformation($"명령 전송됨: {commandType}, ID: {command.Id}");
}
}
// 이벤트 버스
public class EventBus
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogger<EventBus> _logger;
private readonly string _exchangeName = "events";
public EventBus(IConnection connection, ILogger<EventBus> logger)
{
_connection = connection;
_channel = connection.CreateModel();
_logger = logger;
// 이벤트 교환기 설정
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Topic, durable: true);
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
var eventType = @event.GetType().Name;
var routingKey = eventType.Replace("Event", "").ToLower() + ".happened";
// 이벤트 직렬화
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
// 기본 속성 설정
var properties = _channel.CreateBasicProperties();
properties.MessageId = @event.Id.ToString();
properties.CorrelationId = @event.CorrelationId?.ToString();
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
properties.Headers = new Dictionary<string, object>
{
{ "event_type", eventType }
};
properties.Persistent = true;
// 이벤트 발행
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
_logger.LogInformation($"이벤트 발행됨: {eventType}, ID: {@event.Id}, 상관관계 ID: {@event.CorrelationId}");
}
public void Subscribe<TEvent>(string queueName, Func<string, Task> callback) where TEvent : IEvent
{
var eventType = typeof(TEvent).Name;
var routingKey = eventType.Replace("Event", "").ToLower() + ".happened";
// 큐 선언
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
// 큐를 교환기에 바인딩
_channel.QueueBind(
queue: queueName,
exchange: _exchangeName,
routingKey: routingKey);
// 소비자 설정
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_logger.LogInformation($"이벤트 수신됨: {eventType}, 큐: {queueName}");
try
{
await callback(message);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, $"이벤트 처리 오류: {eventType}");
_channel.BasicNack(ea.DeliveryTag, false, true);
}
};
_channel.BasicConsume(
queue: queueName,
autoAck: false,
consumer: consumer);
_logger.LogInformation($"이벤트 구독 시작: {eventType}, 큐: {queueName}");
}
}
// 명령 프로세서
public class CommandProcessor
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<CommandProcessor> _logger;
private readonly EventBus _eventBus;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName = "commands";
public CommandProcessor(
IServiceProvider serviceProvider,
IConnection connection,
EventBus eventBus,
ILogger<CommandProcessor> logger)
{
_serviceProvider = serviceProvider;
_connection = connection;
_channel = connection.CreateModel();
_eventBus = eventBus;
_logger = logger;
// 명령 교환기 설정
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Direct, durable: true);
}
public void RegisterHandler<TCommand, THandler>(string queueName)
where TCommand : ICommand
where THandler : ICommandHandler<TCommand>
{
var commandType = typeof(TCommand).Name;
var routingKey = commandType;
// 큐 선언
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
// 큐를 교환기에 바인딩
_channel.QueueBind(
queue: queueName,
exchange: _exchangeName,
routingKey: routingKey);
// 소비자 설정
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
_logger.LogInformation($"명령 수신됨: {commandType}, 큐: {queueName}");
try
{
// 명령 역직렬화
var command = JsonConvert.DeserializeObject<TCommand>(message);
// 핸들러 인스턴스 생성
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<THandler>();
// 명령 처리
var events = await handler.HandleAsync(command);
// 발생한 이벤트 발행
if (events != null)
{
foreach (var @event in events)
{
await _eventBus.PublishAsync(@event);
}
}
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, $"명령 처리 오류: {commandType}");
_channel.BasicNack(ea.DeliveryTag, false, true);
}
};
_channel.BasicConsume(
queue: queueName,
autoAck: false,
consumer: consumer);
_logger.LogInformation($"명령 핸들러 등록됨: {commandType}, 큐: {queueName}");
}
}
// 구체적인 명령 및 핸들러 구현 예시
public class CreateOrderCommand : ICommand
{
public Guid Id { get; } = Guid.NewGuid();
public string CommandType => nameof(CreateOrderCommand);
public DateTime Timestamp { get; } = DateTime.UtcNow;
public Guid CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public string ShippingAddress { get; set; }
}
public class OrderItem
{
public Guid ProductId { get; set; }
public int Quantity { get; set; }
public decimal Price { get; set; }
}
public class OrderCreatedEvent : IEvent
{
public Guid Id { get; } = Guid.NewGuid();
public string EventType => nameof(OrderCreatedEvent);
public DateTime Timestamp { get; } = DateTime.UtcNow;
public Guid? CorrelationId { get; set; }
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public string OrderStatus { get; set; }
}
public class CreateOrderHandler : ICommandHandler<CreateOrderCommand>
{
private readonly ILogger<CreateOrderHandler> _logger;
// 실제 구현에서는 DB 저장소 등 주입
public CreateOrderHandler(ILogger<CreateOrderHandler> logger)
{
_logger = logger;
}
public async Task<List<IEvent>> HandleAsync(CreateOrderCommand command)
{
_logger.LogInformation($"주문 생성 처리 중: {command.Id}");
// 비즈니스 로직 수행
// - 재고 확인
// - 주문 유효성 검증
// - DB에 주문 저장
await Task.Delay(100); // DB 작업 시뮬레이션
// 주문 ID 생성
var orderId = Guid.NewGuid();
// 총액 계산
decimal totalAmount = 0;
foreach (var item in command.Items)
{
totalAmount += item.Price * item.Quantity;
}
// 주문 생성 이벤트 반환
var orderCreatedEvent = new OrderCreatedEvent
{
CorrelationId = command.Id,
OrderId = orderId,
CustomerId = command.CustomerId,
TotalAmount = totalAmount,
OrderStatus = "Created"
};
return new List<IEvent> { orderCreatedEvent };
}
}
// 애플리케이션 설정
public class Program
{
public static async Task Main(string[] args)
{
var host = CreateHostBuilder(args).Build();
// 서비스 시작
await host.RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
// RabbitMQ 연결 설정
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
var connection = factory.CreateConnection();
services.AddSingleton(connection);
services.AddSingleton<CommandBus>();
services.AddSingleton<EventBus>();
services.AddSingleton<CommandProcessor>();
// 명령 핸들러 등록
services.AddTransient<CreateOrderHandler>();
// 백그라운드 서비스 등록
services.AddHostedService<OrderProcessingService>();
});
}
// 백그라운드 서비스
public class OrderProcessingService : BackgroundService
{
private readonly ILogger<OrderProcessingService> _logger;
private readonly CommandProcessor _commandProcessor;
private readonly EventBus _eventBus;
public OrderProcessingService(
ILogger<OrderProcessingService> logger,
CommandProcessor commandProcessor,
EventBus eventBus)
{
_logger = logger;
_commandProcessor = commandProcessor;
_eventBus = eventBus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("주문 처리 서비스 시작");
// 명령 핸들러 등록
_commandProcessor.RegisterHandler<CreateOrderCommand, CreateOrderHandler>("create-order-queue");
// 이벤트 구독
_eventBus.Subscribe<OrderCreatedEvent>("order-notification-queue", async (message) =>
{
var @event = JsonConvert.DeserializeObject<OrderCreatedEvent>(message);
_logger.LogInformation($"주문 생성 이벤트 처리: {@event.OrderId}");
// 알림 발송 등의 로직 처리
await Task.Delay(50);
_logger.LogInformation($"주문 알림 발송 완료: {@event.OrderId}");
});
// 서비스 종료까지 대기
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
|
메시징 큐 설계 및 구현 지침#
효율적이고 안정적인 메시징 시스템을 구축하기 위한 주요 설계 및 구현 지침
- 메시지 설계 원칙
메시지는 시스템 간의 통신 단위이므로 명확하고 일관된 설계가 중요하다.- 자기 완결성(Self-contained): 메시지는 필요한 모든 정보를 포함해야 한다.
- 스키마 버전 관리: 메시지 구조 변경에 대비한 버전 관리 전략이 필요하다.
- 적절한 크기: 너무 크거나 작은 메시지는 성능에 영향을 미친다.
- 표준화된 형식: JSON, Avro, Protocol Buffers 등 표준 형식을 사용한다.
- 메타데이터 포함: 메시지 ID, 타임스탬프, 추적 정보 등을 포함한다.
- 큐 구조 설계
큐의 구조와 관계는 메시징 시스템의 성능과 확장성에 직접적인 영향을 미친다.- 목적별 큐 분리: 각 비즈니스 기능이나 처리 유형에 따라, 그리고 중요도나 우선순위에 따라 큐를 분리한다.
- 큐 크기 제한: 무제한 큐는 메모리 문제를 일으킬 수 있으므로 적절한 제한이 필요하다.
- 데드 레터 큐 구성: 처리 실패한 메시지를 저장하고 분석할 수 있는 별도의 큐를 마련한다.
- 지연 큐 활용: 특정 시간 후에 처리해야 하는 메시지를 위한 지연 큐를 구성한다.
- 우선순위 큐 고려: 중요도에 따라 메시지 처리 순서를 조정할 수 있는 우선순위 큐를 활용한다.
- 오류 처리 및 복원력 전략
장애 상황에서도 데이터 손실 없이 안정적으로 작동하는 메시징 시스템을 구축하기 위한 전략이다.- 자동 재시도: 일시적인 오류에 대응하기 위한 자동 재시도 메커니즘을 구현한다.
- 지수 백오프(Exponential Backoff): 반복적인 실패 시 재시도 간격을 점진적으로 늘린다.
- 서킷 브레이커 패턴: 다운스트림 서비스 장애 시 요청을 차단하여 시스템을 보호한다.
- 멱등성 보장: 중복 메시지 처리가 안전하도록 멱등성을 구현한다.
- 부분 실패 처리: 배치 처리 시 일부 실패에 대한 대응 전략을 마련한다.
- 확장성 고려사항
증가하는 부하에 대응할 수 있는 확장 가능한 메시징 시스템을 설계하기 위한 고려사항이다.- 수평적 확장: 소비자 그룹을 통해 처리량을 향상시킨다.
- 메시지 파티셔닝: 관련 메시지가 동일한 소비자에게 전달되도록 파티셔닝한다.
- 비동기 처리: 소비자의 처리 시간이 생산자에게 영향을 미치지 않도록 한다.
- 부하 분산: 여러 브로커와 큐에 부하를 분산시킨다.
- 클러스터링: 고가용성을 위한 브로커 클러스터를 구성한다.
- 모니터링 및 운영 최적화
효과적인 운영을 위한 모니터링 및 관리 전략이다.- 주요 지표 추적: 큐 깊이, 처리율, 오류율 등의 핵심 지표를 모니터링한다.
- 경보 설정: 문제 상황에 대한 조기 경보 시스템을 구축한다.
- 로깅 전략: 디버깅과 분석을 위한 효과적인 로깅을 구현한다.
- 성능 최적화: 배치 처리, 메시지 압축, 소비자 수 조정 등을 통해 성능을 최적화한다.
- 자동화된 운영: 자동 확장, 자가 복구 등의 자동화 기능을 구현한다.
메시징 큐 도입 시 고려사항#
메시징 큐 도입을 고려할 때 평가해야 할 주요 사항과 권장 사항이다.
- 적합성 평가
모든 상황에 메시징 큐가 최적의 솔루션은 아니다. 다음 상황에서 메시징 큐 도입을 고려해야 한다.- 비동기 처리가 필요한 경우: 즉각적인 응답이 불필요한 작업
- 부하 분산이 필요한 경우: 트래픽 스파이크 처리
- 시스템 간 느슨한 결합이 필요한 경우: 독립적인 서비스 운영
- 내구성 있는 통신이 필요한 경우: 메시지 손실 방지
- 처리 속도 차이가 있는 경우: 생산자와 소비자 간 속도 불일치
- 기술 선택 기준
메시징 솔루션 선택 시 고려해야 할 주요 기준이다.- 처리량 요구사항: 초당 처리해야 하는 메시지 수
- 지연 시간 요구사항: 허용 가능한 메시지 전달 및 처리 지연
- 내구성 요구사항: 메시지 손실 허용 정도
- 전달 보장: At-most-once, At-least-once, Exactly-once 중 필요한 수준
- 확장성 요구사항: 향후 예상되는 성장과 확장 니즈
- 운영 복잡성: 관리 및 모니터링의 용이성
- 비용 및 리소스: 라이선스, 인프라, 유지보수 비용
- 구현 및 통합 권장사항
효과적인 메시징 큐 구현 및 통합을 위한 실용적인 권장사항이다.- 점진적 도입: 모든 시스템을 한 번에 전환하기보다 중요도가 낮은 기능부터 점진적으로 메시징 큐를 적용한다.
- 비동기 처리 패턴 채택: 동기식 요청-응답에서 비동기 이벤트 기반 패턴으로 전환한다.
- 오류 처리 우선화: 구현 초기 단계부터 견고한 오류 처리를 설계한다.
- 로컬 개발 환경 구성: 개발자가 로컬에서 메시징 시스템을 테스트할 수 있도록 환경을 구성한다.
- 자동화된 테스트: 메시지 생산, 소비, 오류 처리를 포함한 자동화된 테스트를 구현한다.
- 문서화 및 표준화: 메시지 형식, 큐 명명 규칙, 오류 처리 절차 등을 명확하게 문서화한다.
- 모니터링 인프라 구축: 메시징 시스템 도입과 함께 모니터링 인프라를 구축한다.
용어 정리#
참고 및 출처#
RabbitMQ RabbitMQ는 Erlang 언어로 작성된 오픈 소스 메시지 브로커 시스템으로, AMQP(Advanced Message Queuing Protocol)를 구현하고 있다. 2007년에 처음 출시되었으며, 현재는 VMware의 자회사인 Pivotal Software에서 관리하고 있다. RabbitMQ는 안정성, 확장성, 다양한 메시징 패턴 지원 등으로 인해 많은 기업들이 메시지 기반 아키텍처의 핵심 컴포넌트로 채택하고 있다.
주요 특징 표준 프로토콜 지원: AMQP 0-9-1을 기본으로 하며, STOMP, MQTT, HTTP 등 다양한 프로토콜을 플러그인을 통해 지원한다. 유연한 라우팅: Exchange와 바인딩을 통한 강력하고 유연한 메시지 라우팅 기능을 제공한다. 클러스터링: 높은 가용성과 처리량을 위한 내장 클러스터링 기능을 제공한다. 관리 인터페이스: 직관적인 웹 기반 관리 UI와 HTTP API를 제공한다. 플러그인 아키텍처: 기능을 확장할 수 있는 다양한 플러그인을 지원한다. 다양한 언어 지원: Java, Python, Ruby, PHP, C#, JavaScript 등 다양한 언어의 클라이언트 라이브러리를 제공한다. RabbitMQ의 주요 사용 사례 비동기 처리: 시간이 오래 걸리는 작업을 비동기적으로 처리할 수 있다. 서비스 간 통신: 마이크로서비스 아키텍처에서 서비스 간 통신 채널로 활용된다. 부하 분산: 작업을 여러 워커에게 분산시켜 시스템의 부하를 균등하게 분배한다. 이벤트 기반 아키텍처: 이벤트를 생성하고 소비하는 이벤트 기반 시스템의 기반이 된다. 데이터 스트리밍: 실시간 데이터 스트림을 처리하는 데 사용된다. RabbitMQ의 핵심 개념 AMQP 프로토콜
AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어를 위한 개방형 표준 프로토콜이다. AMQP의 주요 개념을 이해하는 것은 RabbitMQ를 효과적으로 사용하기 위한 기본이다.
...