Idempotent Consumer#
Idempotent Consumer는 마이크로서비스 아키텍처(MSA)의 메시징 패턴 중 하나로, 메시지의 중복 처리를 방지하고 시스템의 일관성을 유지하는 데 중요한 역할을 한다.
Idempotent Consumer는 동일한 메시지를 여러 번 처리하더라도 시스템의 상태가 변하지 않도록 설계된 소비자를 의미한다. 즉, 메시지의 중복 처리가 발생해도 최종 결과는 항상 동일하다.
Idempotent Consumer 패턴은 MSA 환경에서 메시지의 안정적인 처리를 보장하고, 시스템의 일관성을 유지하는 데 중요한 역할을 한다. 이 패턴을 적절히 구현함으로써 분산 시스템의 신뢰성과 견고성을 크게 향상시킬 수 있다.
Idempotent Consumer의 필요성#
- 메시지 중복 발생 가능성: 네트워크 오류, 시스템 장애 등으로 인해 메시지가 중복 전송될 수 있다.
- At-least-once 전달: 많은 메시징 시스템이 적어도 한 번은 메시지를 전달하는 것을 보장하지만, 이는 중복 전달의 가능성을 내포한다.
- 데이터 일관성 유지: 중복 처리로 인한 데이터 불일치를 방지해야 한다.
Idempotent Consumer 구현 방법#
고유 식별자 사용:
- 각 메시지에 고유한 ID를 부여한다.
- 소비자는 처리한 메시지의 ID를 기록하고, 이미 처리한 ID의 메시지는 무시한다.
상태 기반 처리:
- 작업의 결과를 기반으로 처리 여부를 결정한다.
- 예: 이미 존재하는 주문은 다시 생성하지 않는다.
버전 관리:
- 각 엔티티에 버전 번호를 부여한다.
- 메시지 처리 시 버전을 확인하여 중복 처리를 방지한다.
데이터베이스 제약 조건 활용:
- 유니크 제약 조건을 사용하여 중복 데이터 삽입을 방지한다.
주의사항#
- 성능 고려: 처리된 메시지 ID를 영구적으로 저장할 경우, 데이터베이스 부하가 증가할 수 있다.
- ID 관리: 메시지 ID의 유일성을 보장해야 한다.
- 동시성 처리: 여러 인스턴스가 동시에 같은 메시지를 처리하려 할 때의 동시성 문제를 고려해야 한다.
- 데이터 일관성 유지: 중복 처리로 인한 데이터 불일치를 방지한다.
- 시스템 안정성 향상: 예기치 않은 중복 메시지 처리에 대비할 수 있다.
- 장애 복구 용이성: 시스템 장애 후 복구 시 안전하게 메시지를 재처리할 수 있다.
구현 예시#
Javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| // Idempotent Consumer 구현
class IdempotentConsumer {
constructor(options = {}) {
this.processedMessages = new Map();
this.ttl = options.ttl || 24 * 60 * 60 * 1000; // 24시간
this.deduplicationStore = new DeduplicationStore();
}
async processMessage(message) {
const messageId = this.extractMessageId(message);
try {
// 메시지 중복 체크
if (await this.isDuplicate(messageId)) {
console.log(`Duplicate message detected: ${messageId}`);
return this.handleDuplicate(message);
}
// 멱등성 키 저장
await this.markAsProcessing(messageId);
// 메시지 처리
const result = await this.executeProcessing(message);
// 처리 완료 표시
await this.markAsCompleted(messageId, result);
return result;
} catch (error) {
// 실패 처리
await this.handleProcessingFailure(messageId, error);
throw error;
}
}
async isDuplicate(messageId) {
return await this.deduplicationStore.exists(messageId);
}
extractMessageId(message) {
// 메시지에서 고유 식별자 추출
return message.id || message.messageId || crypto.randomUUID();
}
}
|
Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public class IdempotentConsumer {
private Set<String> processedMessageIds = new HashSet<>();
public void processMessage(Message message) {
String messageId = message.getId();
if (!processedMessageIds.contains(messageId)) {
// 메시지 처리 로직
// ...
// 처리 완료 후 ID 기록
processedMessageIds.add(messageId);
} else {
System.out.println("중복 메시지 무시: " + messageId);
}
}
}
|
참고 및 출처#