Event-Bus Pattern

소프트웨어 시스템의 컴포넌트 간 통신을 단순화하고 유연성을 높이는 아키텍처 패턴이다.
이 패턴은 발행-구독(Publish-Subscribe) 모델을 기반으로 하며, 컴포넌트 간의 느슨한 결합을 촉진한다.

장점

  1. 느슨한 결합: 컴포넌트 간 직접적인 의존성이 줄어들어 시스템의 유연성이 향상된다.
  2. 확장성: 새로운 컴포넌트를 쉽게 추가하거나 제거할 수 있어 시스템 확장이 용이한다.
  3. 비동기 통신: 이벤트 기반의 비동기 통신으로 시스템의 반응성과 성능이 향상된다.
  4. 단순화된 통신: 복잡한 컴포넌트 간 통신 로직을 단순화할 수 있다.

단점

  1. 복잡성 증가: 시스템 전체의 흐름을 파악하기 어려울 수 있다.
  2. 메모리 사용 증가: 모든 구독자에게 이벤트가 전달되므로 메모리 사용량이 증가할 수 있다.
  3. 디버깅의 어려움: 비동기적 특성으로 인해 문제 추적이 어려울 수 있다.

핵심 구성요소

https://medium.com/elixirlabs/event-bus-implementation-s-d2854a9fafd5
Event Bus with multiple subscribers(green arrows) and notifiers(red arrows)

  1. 이벤트(Event):
    이벤트는 시스템에서 발생한 중요한 변화나 상태를 나타낸다.
    각 이벤트는 보통 다음과 같은 정보를 포함한다:

    • 이벤트 타입: 어떤 종류의 이벤트인지 구분
    • 타임스탬프: 이벤트 발생 시간
    • 데이터: 이벤트와 관련된 구체적인 정보
    • 이벤트 ID: 이벤트를 고유하게 식별하는 값
  2. 이벤트 버스(Event Bus):
    이벤트 버스는 이벤트의 발행과 구독을 관리하는 중앙 컴포넌트이다.
    주요 기능은 다음과 같다:

    • 이벤트 구독 관리: 특정 이벤트에 대한 구독자 등록/해제
    • 이벤트 발행: 발행된 이벤트를 관련 구독자들에게 전달
    • 이벤트 큐잉: 비동기 처리를 위한 이벤트 큐 관리
    • 에러 처리: 이벤트 처리 중 발생하는 예외 상황 관리
  3. 발행자(Publisher):
    이벤트를 발생시키는 컴포넌트.
    발행자는 다음과 같은 특징을 가진다:

    • 이벤트 버스에만 의존하며, 구독자에 대해 알 필요가 없음
    • 이벤트 생성 및 발행에만 책임을 가짐
    • 이벤트 처리 결과에 대해 관여하지 않음
  4. 구독자(Subscriber):
    특정 이벤트에 관심이 있는 컴포넌트.
    구독자의 특징은 다음과 같다:

    • 관심 있는 이벤트 타입을 이벤트 버스에 등록
    • 이벤트 발생 시 자동으로 통지받음
    • 이벤트 처리 로직 구현

작동 방식

  1. 이벤트 생성자가 이벤트를 생성하고 이벤트 버스에 발행한다.
  2. 이벤트 버스는 발행된 이벤트를 받아 적절한 구독자에게 전달한다.
  3. 구독자는 자신이 관심 있는 이벤트만을 수신하고 처리한다.
  4. 이벤트 처리가 완료되면 해당 이벤트는 소멸된다.

구현 예시

이벤트 버스 패턴은 다양한 프로그래밍 언어와 프레임워크에서 구현할 수 있다.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
from typing import Dict, List, Callable
from dataclasses import dataclass
from datetime import datetime
import threading
import queue
import uuid

# 이벤트 기본 클래스
@dataclass
class Event:
    event_type: str
    timestamp: datetime
    data: dict
    event_id: str = str(uuid.uuid4())

# 이벤트 버스 구현
class EventBus:
    def __init__(self):
        # 이벤트 타입별 구독자 목록
        self._subscribers: Dict[str, List[Callable]] = {}
        # 이벤트 큐
        self._event_queue = queue.Queue()
        # 이벤트 처리 스레드
        self._processing_thread = threading.Thread(target=self._process_events)
        self._is_running = True
        self._processing_thread.start()

    def subscribe(self, event_type: str, handler: Callable):
        """이벤트 타입에 대한 핸들러 등록"""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler)
    
    def publish(self, event: Event):
        """이벤트 발행"""
        self._event_queue.put(event)

    def _process_events(self):
        """이벤트 처리 루프"""
        while self._is_running:
            try:
                event = self._event_queue.get(timeout=1.0)
                self._dispatch_event(event)
            except queue.Empty:
                continue

    def _dispatch_event(self, event: Event):
        """등록된 핸들러들에게 이벤트 전달"""
        if event.event_type in self._subscribers:
            for handler in self._subscribers[event.event_type]:
                try:
                    handler(event)
                except Exception as e:
                    print(f"Error handling event {event.event_id}: {e}")

    def shutdown(self):
        """이벤트 버스 종료"""
        self._is_running = False
        self._processing_thread.join()

# 예시: 주문 시스템에서의 사용
class OrderService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus

    def place_order(self, order_data: dict):
        # 주문 처리 로직
        print(f"Processing order: {order_data}")
        
        # 주문 완료 이벤트 발행
        event = Event(
            event_type="OrderPlaced",
            timestamp=datetime.now(),
            data=order_data
        )
        self.event_bus.publish(event)

class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # OrderPlaced 이벤트 구독
        self.event_bus.subscribe("OrderPlaced", self.handle_order_placed)

    def handle_order_placed(self, event: Event):
        print(f"Updating inventory for order: {event.data}")
        # 재고 업데이트 로직

class NotificationService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # OrderPlaced 이벤트 구독
        self.event_bus.subscribe("OrderPlaced", self.handle_order_placed)

    def handle_order_placed(self, event: Event):
        print(f"Sending notification for order: {event.data}")
        # 알림 발송 로직

# 사용 예시
if __name__ == "__main__":
    event_bus = EventBus()
    
    order_service = OrderService(event_bus)
    inventory_service = InventoryService(event_bus)
    notification_service = NotificationService(event_bus)

    # 주문 처리
    order_service.place_order({"order_id": "123", "product": "laptop"})

적용 사례

이벤트 버스 패턴은 다음과 같은 상황에서 특히 유용하다:

  • 마이크로서비스 아키텍처에서 서비스 간 통신
  • 복잡한 사용자 인터페이스에서 컴포넌트 간 상태 동기화
  • 실시간 데이터 처리 시스템
  • 분산 시스템에서의 이벤트 기반 워크플로우

참고 및 출처