Message Queues and Streams

소프트웨어 아키텍처 패턴 중 “Message Queues and Streams"는 분산 시스템에서 중요한 역할을 하는 통신 메커니즘이다. 이 두 가지 패턴은 비슷해 보이지만 각각 고유한 특성과 용도를 가지고 있다.

Message Queues와 Streams는 각각 고유한 장단점을 가지고 있다. 메시지 큐는 작업의 안정적인 처리와 시스템 간 결합도 감소에 적합하며, 스트림은 실시간 데이터 처리와 분석에 더 적합하다.
프로젝트의 요구사항과 특성에 따라 적절한 패턴을 선택하거나 두 패턴을 조합하여 사용할 수 있다.

Message Queues (메시지 큐)

메시지 큐는 애플리케이션 간 비동기 통신을 위한 중간 계층 역할을 한다.

주요 특징

  1. 비동기 통신: 생산자(Producer)와 소비자(Consumer) 간 시간적 결합을 제거한다.
  2. 내구성: 메시지는 소비될 때까지 큐에 안전하게 저장된다.
  3. 순서 보장: 대부분의 경우 FIFO(First In, First Out) 방식으로 메시지를 처리한다.
  4. 단일 소비: 각 메시지는 일반적으로 하나의 소비자에 의해서만 처리된다.

사용 사례

  1. 작업 큐: 백그라운드 작업 처리 (예: 이미지 처리, 이메일 발송)
  2. 부하 분산: 다수의 Consumer가 메시지를 처리하여 부하를 분산
  3. 마이크로서비스 통신: 서비스 간 느슨한 결합을 위한 비동기 통신

장점

  1. 확장성: 생산자와 소비자를 독립적으로 확장할 수 있다.
  2. 안정성: 시스템 일부가 실패해도 전체에 영향을 미치지 않는다.
  3. 보장성: 작업의 완료 여부를 확인할 수 있다.

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 메시지 큐 구현 예시
from queue import Queue
import threading
import time

class MessageQueue:
    def __init__(self):
        self.queue = Queue()
        
    def produce(self, message):
        # 메시지를 큐에 추가
        self.queue.put(message)
        print(f"생산됨: {message}")
        
    def consume(self):
        # 큐에서 메시지를 가져와 처리
        while True:
            message = self.queue.get()
            print(f"처리됨: {message}")
            time.sleep(1)  # 처리 시간 시뮬레이션

Streams (스트림)

스트림은 연속적인 데이터 흐름을 실시간으로 처리하는 메커니즘이다.

주요 특징

  1. 실시간 처리: 데이터가 도착하는 즉시 처리할 수 있다.
  2. 지속성: 데이터는 일정 기간 동안 저장되어 재생 가능하다.
  3. 다중 소비: 여러 소비자가 동일한 데이터를 동시에 읽을 수 있다.
  4. 높은 처리량: 대량의 데이터를 지속적으로 처리할 수 있다.

사용 사례

  1. 실시간 분석: 금융 거래 모니터링, 사기 탐지 등
  2. IoT 데이터 처리: 센서 데이터의 실시간 수집 및 분석
  3. 로그 처리: 대규모 시스템의 로그 실시간 모니터링

장점

  1. 실시간 인사이트: 데이터가 생성되는 즉시 분석 가능
  2. 확장성: 수평적 확장을 통해 대규모 데이터 처리 가능
  3. 복잡한 처리: 시간 윈도우, 이벤트 시간 처리 등 고급 기능 제공

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 스트림 처리 예시
const dataStream = new ReadableStream({
    start(controller) {
        // 데이터 스트림 시작
        setInterval(() => {
            const data = generateData();
            controller.enqueue(data);
        }, 1000);
    },
    
    pull(controller) {
        // 데이터 요청 시 처리
    },
    
    cancel() {
        // 스트림 종료 처리
    }
});

Message Queues vs. Streams 비교

특성Message QueuesStreams
데이터 처리배치 처리 가능실시간 분석
메시지 소비일반적으로 한 번만여러 번 가능
데이터 유지소비 후 삭제일정 기간 유지
처리 방식비동기동기 및 연속적
확장성소비자 추가로 확장노드 추가로 수평 확장

실제 활용 사례

메시지 큐 활용:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# RabbitMQ를 사용한 메시지 큐 예시
import pika

class OrderProcessor:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
    def send_order(self, order):
        # 주문 메시지 전송
        self.channel.basic_publish(
            exchange='',
            routing_key='orders',
            body=order
        )
        
    def process_orders(self):
        # 주문 처리
        def callback(ch, method, properties, body):
            print(f"주문 처리: {body}")
            
        self.channel.basic_consume(
            queue='orders',
            on_message_callback=callback,
            auto_ack=True
        )
        self.channel.start_consuming()

아키텍처 구성 요소

주요 구성 요소:

확장성과 성능 고려사항

시스템 설계 시 고려할 점:

최신 트렌드와 도구

현대적인 접근 방식:


참고 및 출처