Idempotent Consumer

Idempotent Consumer는 마이크로서비스 아키텍처(MSA)의 메시징 패턴 중 하나로, 메시지의 중복 처리를 방지하고 시스템의 일관성을 유지하는 데 중요한 역할을 한다.

Idempotent Consumer는 동일한 메시지를 여러 번 처리하더라도 시스템의 상태가 변하지 않도록 설계된 소비자를 의미한다. 즉, 메시지의 중복 처리가 발생해도 최종 결과는 항상 동일하다.

Idempotent Consumer 패턴은 MSA 환경에서 메시지의 안정적인 처리를 보장하고, 시스템의 일관성을 유지하는 데 중요한 역할을 한다. 이 패턴을 적절히 구현함으로써 분산 시스템의 신뢰성과 견고성을 크게 향상시킬 수 있다.

Idempotent Consumer의 필요성

  1. 메시지 중복 발생 가능성: 네트워크 오류, 시스템 장애 등으로 인해 메시지가 중복 전송될 수 있다.
  2. At-least-once 전달: 많은 메시징 시스템이 적어도 한 번은 메시지를 전달하는 것을 보장하지만, 이는 중복 전달의 가능성을 내포한다.
  3. 데이터 일관성 유지: 중복 처리로 인한 데이터 불일치를 방지해야 한다.

Idempotent Consumer 구현 방법

  1. 고유 식별자 사용:

    • 각 메시지에 고유한 ID를 부여한다.
    • 소비자는 처리한 메시지의 ID를 기록하고, 이미 처리한 ID의 메시지는 무시한다.
  2. 상태 기반 처리:

    • 작업의 결과를 기반으로 처리 여부를 결정한다.
    • 예: 이미 존재하는 주문은 다시 생성하지 않는다.
  3. 버전 관리:

    • 각 엔티티에 버전 번호를 부여한다.
    • 메시지 처리 시 버전을 확인하여 중복 처리를 방지한다.
  4. 데이터베이스 제약 조건 활용:

    • 유니크 제약 조건을 사용하여 중복 데이터 삽입을 방지한다.

주의사항

  1. 성능 고려: 처리된 메시지 ID를 영구적으로 저장할 경우, 데이터베이스 부하가 증가할 수 있다.
  2. ID 관리: 메시지 ID의 유일성을 보장해야 한다.
  3. 동시성 처리: 여러 인스턴스가 동시에 같은 메시지를 처리하려 할 때의 동시성 문제를 고려해야 한다.

장점

  1. 데이터 일관성 유지: 중복 처리로 인한 데이터 불일치를 방지한다.
  2. 시스템 안정성 향상: 예기치 않은 중복 메시지 처리에 대비할 수 있다.
  3. 장애 복구 용이성: 시스템 장애 후 복구 시 안전하게 메시지를 재처리할 수 있다.

구현 예시

Javascript

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Idempotent Consumer 구현
class IdempotentConsumer {
    constructor(options = {}) {
        this.processedMessages = new Map();
        this.ttl = options.ttl || 24 * 60 * 60 * 1000; // 24시간
        this.deduplicationStore = new DeduplicationStore();
    }

    async processMessage(message) {
        const messageId = this.extractMessageId(message);

        try {
            // 메시지 중복 체크
            if (await this.isDuplicate(messageId)) {
                console.log(`Duplicate message detected: ${messageId}`);
                return this.handleDuplicate(message);
            }

            // 멱등성 키 저장
            await this.markAsProcessing(messageId);

            // 메시지 처리
            const result = await this.executeProcessing(message);

            // 처리 완료 표시
            await this.markAsCompleted(messageId, result);

            return result;
        } catch (error) {
            // 실패 처리
            await this.handleProcessingFailure(messageId, error);
            throw error;
        }
    }

    async isDuplicate(messageId) {
        return await this.deduplicationStore.exists(messageId);
    }

    extractMessageId(message) {
        // 메시지에서 고유 식별자 추출
        return message.id || message.messageId || crypto.randomUUID();
    }
}

Java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class IdempotentConsumer {
    private Set<String> processedMessageIds = new HashSet<>();

    public void processMessage(Message message) {
        String messageId = message.getId();
        if (!processedMessageIds.contains(messageId)) {
            // 메시지 처리 로직
            // ...

            // 처리 완료 후 ID 기록
            processedMessageIds.add(messageId);
        } else {
            System.out.println("중복 메시지 무시: " + messageId);
        }
    }
}

참고 및 출처