Publisher-Subscriber Pattern

Publisher-Subscriber Pattern(게시자-구독자 패턴)은 소프트웨어 아키텍처에서 중요한 디자인 패턴 중 하나로, 분산 시스템에서 비동기 통신을 구현하는 데 널리 사용된다.
이 패턴은 메시지를 보내는 발행자(Publisher)와 메시지를 받는 구독자(Subscriber) 사이의 느슨한 결합(Loose Coupling)을 제공하는 메시징 패턴으로, 발행자는 메시지를 특정 주제(Topic)나 채널로 발행하고, 해당 주제를 구독하는 모든 구독자들이 그 메시지를 받게 된다.
이 패턴은 컴포넌트 간의 느슨한 결합을 제공하여 확장성과 유연성을 높이는 데 기여한다.

logical components of Publisher-Subscriber Pattern
https://learn.microsoft.com/en-us/azure/architecture/patterns/publisher-subscriber

기본 개념

Publisher-Subscriber 패턴의 핵심 개념은 다음과 같다:

  1. 게시자(Publisher): 메시지를 생성하고 전송하는 컴포넌트이다.
  2. 구독자(Subscriber): 메시지를 수신하고 처리하는 컴포넌트이다.
  3. 메시지 브로커(Message Broker): 게시자와 구독자 사이에서 메시지를 중계하는 중간 컴포넌트이다.
  4. 토픽(Topic): 메시지를 분류하는 논리적 채널이다.

작동 방식

  1. 구독자는 관심 있는 토픽에 대해 메시지 브로커에 구독을 등록한다.
  2. 게시자는 특정 토픽에 대한 메시지를 메시지 브로커에 전송한다.
  3. 메시지 브로커는 해당 토픽을 구독한 모든 구독자에게 메시지를 전달한다.
  4. 구독자는 수신한 메시지를 비동기적으로 처리한다.

주요 특징

  1. 느슨한 결합: 게시자와 구독자는 서로의 존재를 모르며, 오직 메시지 브로커를 통해 통신한다.
  2. 확장성: 새로운 게시자나 구독자를 쉽게 추가할 수 있어 시스템 확장이 용이하다.
  3. 비동기 통신: 메시지 전송과 처리가 비동기적으로 이루어져 시스템의 응답성을 향상시킨다.
  4. 다대다 통신: 하나의 메시지가 여러 구독자에게 전달될 수 있다.

장점

  1. 유연성: 시스템 구성 요소를 독립적으로 개발하고 수정할 수 있다.
  2. 신뢰성: 메시지 브로커가 메시지 전달을 보장하여 시스템의 안정성을 높인다.
  3. 성능: 비동기 처리로 인해 전체 시스템의 성능이 향상된다.
  4. 이벤트 기반 아키텍처: 실시간 이벤트 처리와 반응형 시스템 구축에 적합하다.

단점

  1. 복잡성: 직접 통신에 비해 구현과 디버깅이 더 복잡할 수 있다.
  2. 메시지 순서: 메시지의 순서가 보장되지 않을 수 있다.
  3. 단일 실패 지점: 메시지 브로커가 시스템의 단일 실패 지점이 될 수 있다.
  4. 구독자 상태 파악: 구독자의 건강 상태를 확인하기 어려울 수 있다.

사용 사례

  1. 이벤트 알림 시스템: 대규모 사용자에게 실시간 알림을 전송하는 경우.
  2. 분산 캐싱: 여러 서버 간의 캐시 동기화.
  3. 마이크로서비스 아키텍처: 서비스 간 비동기 통신.
  4. IoT 시스템: 센서 데이터 수집 및 처리.
  5. 실시간 분석: 대량의 데이터 스트림 처리.

코드 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// publisher.js
const Redis = require('ioredis');
const publisher = new Redis();

class NewsPublisher {
    constructor() {
        this.publisher = publisher;
    }

    // 뉴스 발행 메서드
    async publishNews(category, newsData) {
        try {
            const message = {
                id: Date.now(),
                category,
                content: newsData,
                timestamp: new Date().toISOString()
            };

            // 카테고리별 채널로 뉴스 발행
            await this.publisher.publish(
                `news:${category}`, 
                JSON.stringify(message)
            );

            console.log(`Published news to ${category}:`, message);
            return true;
        } catch (error) {
            console.error('Error publishing news:', error);
            return false;
        }
    }
}

// subscriber.js
const Redis = require('ioredis');
const subscriber = new Redis();

class NewsSubscriber {
    constructor(categories) {
        this.subscriber = subscriber;
        this.categories = categories;
        this.setup();
    }

    setup() {
        // 각 카테고리 채널 구독
        this.categories.forEach(category => {
            this.subscriber.subscribe(`news:${category}`);
        });

        // 메시지 수신 이벤트 처리
        this.subscriber.on('message', (channel, message) => {
            const news = JSON.parse(message);
            this.handleNews(channel, news);
        });
    }

    handleNews(channel, news) {
        console.log(`Received news on ${channel}:`, news);
        // 실제 뉴스 처리 로직 구현
    }
}

// 메시지 브로커 역할을 하는 이벤트 버스
class EventBus {
    constructor() {
        this.redis = new Redis();
        this.channels = new Map();
    }

    // 메시지 필터링 및 전달 관리
    async handleMessage(channel, message) {
        const subscribers = this.channels.get(channel) || [];
        const parsedMessage = JSON.parse(message);

        // 메시지 유효성 검사
        if (!this.validateMessage(parsedMessage)) {
            console.error('Invalid message format:', parsedMessage);
            return;
        }

        // 구독자들에게 메시지 전달
        subscribers.forEach(subscriber => {
            try {
                subscriber.handleNews(channel, parsedMessage);
            } catch (error) {
                console.error('Error delivering message to subscriber:', error);
            }
        });
    }

    validateMessage(message) {
        return message.id && message.category && message.content;
    }
}

참고 및 출처