Queue vs. Topic

Queue 와 Topic 은 메시지 기반 시스템에서 데이터 전달 구조의 핵심 요소로, 시스템 아키텍처와 소비자 처리 방식에 따라 선택된다. Queue 는 Point-to-Point 방식으로 하나의 소비자에게 메시지를 전달하며 작업 분산에 유리하다. 반면 Topic 은 Publish-Subscribe 모델로, 다수의 구독자가 동일한 메시지를 수신할 수 있어 이벤트 브로드캐스팅이나 실시간 알림에 적합하다. 각 방식은 메시지 순서, 중복 수신, 확장성 등 다양한 구현 요소에서 차이를 가지며, 적절한 선택이 시스템 안정성과 확장성에 큰 영향을 미친다.

핵심 개념

기본 개념

Queue (큐)

  • 정의: Point-to-Point 메시징 패턴으로, 메시지가 단일 소비자에게 전달되는 통신 모델
  • 특성: FIFO (First-In-First-Out) 순서 보장, 메시지 지속성, 부하 분산
  • 동작 방식: 프로듀서가 큐에 메시지를 전송하면, 여러 컨슈머 중 하나만이 해당 메시지를 소비

Topic (토픽)

  • 정의: Publish-Subscribe 메시징 패턴으로, 메시지가 여러 구독자에게 브로드캐스트되는 통신 모델
  • 특성: 일대다 통신, 메시지 복제, 실시간 이벤트 전파
  • 동작 방식: 퍼블리셔가 토픽에 메시지를 게시하면, 해당 토픽을 구독하는 모든 컨슈머가 메시지 사본을 수신

실무 구현을 위한 연관성 분석

메시지 브로커 (Message Broker) 연관성

  • Apache Kafka, RabbitMQ, ActiveMQ 등에서 Queue 와 Topic 패턴 모두 지원
  • 브로커는 메시지 라우팅, 지속성, 보안 기능 제공

확장성 (Scalability) 측면

  • Queue: 컨슈머 그룹을 통한 수평적 확장
  • Topic: 파티셔닝과 구독자 확장을 통한 대규모 처리

내결함성 (Fault Tolerance) 구현

  • 메시지 복제, 확인 응답 (Acknowledgment), 재시도 메커니즘
  • Dead Letter Queue/Topic 을 통한 실패 메시지 처리

Queue vs. Topic 비교

Queue 와 Topic 은 메시징 시스템 내에서 메시지 전달, 소비 방식, 신뢰성, 스케일링 방식, 활용 목적 등 다양한 기준에서 차이가 난다.

기본 특성 비교

구분QueueTopic
통신 모델Point-to-PointPublish-Subscribe
메시지 소비단일 컨슈머다중 구독자
메시지 순서FIFO 보장파티션 내에서만 보장
메시지 지속성소비 후 삭제보존 기간에 따라 유지
부하 분산Round-robin 방식구독자별 독립적 처리

성능 특성 비교

구분QueueTopic
처리량 (Throughput)중간 - 높음높음
지연 시간 (Latency)낮음중간
메모리 사용량낮음높음 (구독자 수에 비례)
네트워크 사용량효율적구독자 수에 따라 증가
확장성제한적높음

사용 사례 비교

구분QueueTopic
작업 분산적합부적합
이벤트 브로드캐스트부적합적합
데이터 스트리밍제한적적합
마이크로서비스 통신적합적합
실시간 알림부적합적합

작동 방식

Queue 작동 원리

sequenceDiagram
    participant P as Producer
    participant Q as Queue
    participant C1 as Consumer 1
    participant C2 as Consumer 2
    
    P->>Q: Send Message 1
    P->>Q: Send Message 2
    P->>Q: Send Message 3
    
    Q->>C1: Deliver Message 1
    C1->>Q: Acknowledge
    
    Q->>C2: Deliver Message 2
    C2->>Q: Acknowledge
    
    Q->>C1: Deliver Message 3
    C1->>Q: Acknowledge

Topic 작동 원리

sequenceDiagram
    participant P as Publisher
    participant T as Topic
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2
    
    P->>T: Publish Message
    
    T->>S1: Broadcast Copy 1
    T->>S2: Broadcast Copy 2
    
    S1->>T: Acknowledge
    S2->>T: Acknowledge

도전 과제

카테고리도전 과제원인 또는 설명실무적 영향대응 전략 / 해결 방안
확장성/부하 분산파티션 불균형, 소비자 병목키 기반 파티셔닝 불균형, 핫 파티션, 소비자 수 제한특정 파티션/큐에 부하 집중, 처리량 저하키 해시 분산 설계, 파티션 재조정, 컨슈머 그룹 확장, 큐 샤딩
스케일링 안정성컨슈머 리밸런싱 처리 지연컨슈머 동적 증가/감소, 네트워크 장애 발생 시 리밸런싱 이벤트 트리거됨일시적 처리 중단, 메시지 재전송 발생점진적 스케일링, 상태 기반 할당, 협력적 리밸런싱 (Cooperative Rebalancing)
성능/처리량직렬화 오버헤드, 백프레셔무거운 메시지 포맷 (JSON 등), 빠른 생산자 - 느린 소비자 속도 불일치CPU 부하, 처리량 저하, 큐 적체, 시스템 불안정Avro/Protobuf 적용, 배치 처리, prefetch/poll 설정 조정, 회로 차단기 패턴 (Circuit Breaker)
일관성/정확성메시지 순서 보장 어려움병렬 소비, 토픽 파티션 분산 처리, 소비자 순서 무시이벤트 순서 불일치로 인한 비즈니스 로직 오류파티션 키 고정, 단일 소비자 처리, Kafka FIFO 처리, Vector Clock, 이벤트 소싱 적용
신뢰성메시지 유실, 중복 처리 문제Ack 실패, 컨슈머 장애, 중복 전송으로 인해 발생데이터 누락, 중복 처리, 결과 무결성 손상DLQ 구성, Ack + Retry 정책, 멱등성 (ID+Hash 기반) 소비자 구현
운영/관찰성메시지 흐름 및 상태 추적 어려움큐는 처리 후 삭제, 토픽은 병렬 소비자 존재 → 추적 어려움SLA 준수 실패, 디버깅 난이도 증가Trace ID 삽입, 중앙 로깅 + 분산 트레이싱 도입 (OpenTelemetry, Zipkin 등)
통합/마이그레이션큐 ↔ 토픽 전환 시 데이터 정합성 보장 문제구조 전환 시 메시지 중복/유실 가능, 중단 없는 서비스 이전 어려움전환 중단 시 서비스 중단 또는 누락 이벤트 발생 가능중간 브로커 구성, 양방향 브릿지 메시징, 임시 병행 구독 전략
아키텍처 복잡도혼합 패턴 구성 시 복잡성 증가Queue/Topic 혼합 사용 시 흐름 파악, 장애 추적, 권한 설정 등 어려움운영 비용 증가, 장애 확산 시 전체 영향도 증가도메인별 메시징 전략 수립, 토픽 명명 일관화, 문서화 및 메시지 흐름 시각화
보안/접근 제어구독자/컨슈머 인증/인가 누락ACL 미적용, 인증 토큰 부재, 메시지 노출 우려보안 취약점, 메시지 도난 가능성Role 기반 RBAC, TLS 암호화, JWT/OAuth 기반 메시지 인증 적용
  • Queue 기반 시스템은 확장성과 병렬성 한계, 처리 병목이 주요 이슈이며, 소비자 확장 전략작업 큐 분산 설계가 핵심.
  • Topic 기반 시스템병렬성은 뛰어나나 순서 보장, 메시지 정합성 유지, 모니터링에서 도전 과제가 두드러짐.
  • 모든 아키텍처 공통 이슈는 신뢰성 확보 (DLQ, 멱등성), 관찰성 향상 (Trace, 상태 저장), 스케일 안정화 (리밸런싱, 파티션 튜닝) 에 집중해야 한다.

실무에서 효과적으로 적용하기 위한 고려사항 및 주의할 점

카테고리고려 항목Queue(큐) 고려사항Topic(토픽) 고려사항권장 사항 및 주의점
전달 패턴메시징 모델1:1 처리 (Point-to-Point)1:N 처리 (Publish-Subscribe)요구 사항에 따라 메시징 패턴을 일관되게 설계
순서 보장FIFO 처리단일 소비자 구성 시 순서 보장. FIFO Queue 또는 순차 처리 설계 필요Partition 내 순서 보장만 가능, 전체 순서 보장은 어려움순서 요구 시 단일 파티션/소비자 설정 또는 순서 제어 로직 구현
확장성소비자 수 및 병렬 처리소비자 수를 늘려 작업 병렬화 (Worker Pool)Partition + Consumer Group 구조로 병렬 소비Partition 수 ≥ Consumer 수 권장. 파티션 증설 시 메시지 재분배 고려 필요
장애 복원력장애 및 실패 대응DLQ 구성 필수. 메시지 누락 대비 Ack 기반 재처리 로직 필요Durable Subscription, Offset 기반 재처리 및 Replay 가능DLQ + 재처리 전략 함께 구성. Topic 은 재생 정책 (log compaction 등) 고려
중복 방지Idempotency 처리동일 메시지 재수신 방지를 위한 멱등성 설계 필요각 구독자 단위로 중복 수신 방지 처리 필요메시지 해시, UUID, Redis Lock 등 멱등성 로직 적용
모니터링시스템 상태 추적큐 깊이, 메시지 지연, 소비 실패 추적구독자별 처리율, Offset lag, 파티션 상태 추적Prometheus + Grafana, DLQ 로그 추적, APM 연계
성능 최적화처리량/지연 튜닝Batch 전송, Prefetch 설정, 워커 수 조정Compression, BatchSize, Zero-Copy 전송 등Throughput ↔ Latency 트레이드오프 균형화 설계
운영 관리메시지 보존 및 재처리일반적으로 소비 후 메시지 삭제메시지 로그 보존 설정으로 재처리 및 분석 가능 (Kafka log.retention 등)장기 보관 필요 시 Topic 선택 + TTL 설정 조합
보안인증/암호화/권한 관리TLS, 사용자 인증, 역할 기반 접근 제어 (RBAC)TLS, 구독 필터링, ACL 및 Consumer 인증민감도에 따라 차등 보안 설정, 브로커 레벨 + 애플리케이션 레벨 이중 보안 적용
운영 자동화오토스케일링/백프레셔메시지 증가 시 Worker 자동 확장 + 메시지 Pull 속도 조절소비 속도에 따라 Backpressure 전략 도입 필요KEDA, Kubernetes HPA 연동, Kafka Lag 기반 스케일링 트리거 고려
설계 복잡도혼합 설계 시 주의점Queue/Topic 혼용 시 데이터 흐름 혼란 및 유지보수 난이도 증가 가능기능 분리 목적의 멀티 토픽 구성 시 구독자 설계가 복잡해질 수 있음도메인 기준 메시징 구조 일관성 확보. 기능별 별도 토픽 설계로 복잡도 분리
스키마 설계메시지 포맷/진화JSON/Protobuf/Avro 등 표준 포맷 유지 필요스키마 진화 발생 시 하위 호환성 확보 필요Schema Registry 도입 권장, 버전 관리 포함

실무 설계 및 운영 팁 요약

항목권장 전략 요약
순서 보장FIFO 가 중요한 경우 큐 사용. Topic 은 파티션 내부에서만 순서 보장 가능
확장성과 병렬 처리Topic + Partition + Consumer Group 조합이 가장 효율적인 병렬 처리 구조
신뢰성과 재처리DLQ, Retry Queue, Ack/Nack 조합을 통한 메시지 유실 방지 전략 필수
모니터링 및 자동화Kafka Lag, Queue Length, Failure Count 등 지표 기반 오토스케일링 및 알림 시스템 구축 권장
보안TLS, 인증 키, 토픽 구독 필터링, RBAC 조합으로 전방위 보안 체계 구축 가능
복합 구조 설계 시CQRS, Saga 등 복합 아키텍처에서 Queue 와 Topic 을 혼합 사용 시 도메인 기반 구분 및 명확한 흐름 설계 필수

최적화하기 위한 고려사항 및 주의할 점

카테고리최적화 항목Queue(큐) 최적화 전략Topic(토픽) 최적화 전략실무 권장사항 요약
메시지 크기/포맷메시지 크기 및 직렬화 최적화메시지 크기 10MB 이하 유지, JSON 대신 Avro/Protobuf 활용이벤트 페이로드 최소화, 경량 직렬화 포맷 사용JSON → Avro 전환, Schema Registry 로 일관성 유지
처리량/지연 시간병렬 처리 및 ThroughputPrefetch 조정, Consumer 수 증가, 멀티 워커 구성파티션 수 증가, 컨슈머 그룹 병렬 처리처리량 기준 → Partition/Consumer 수 동적 조정
백프레셔소비자 부하 제어메시지 rate limit 설정, reject + requeue 전략구독자 수 조절, 이벤트 필터링/샤딩 적용처리 병목 시 prefetch 또는 poll 조정, CPU 모니터링 필수
리소스 최적화시스템 자원 할당 및 스케일링CPU/메모리/디스크 모니터링, 워커 오토스케일링 구성브로커/컨슈머 오토스케일링, 클러스터 모니터링Kubernetes + HPA/KEDA 조합 권장
스토리지 최적화메시지 저장소 관리TTL 설정, 수동/자동 배치 삭제, 디스크 압축 설정Retention 정책 설정, 파티션 간 균등 분산Kafka 는 retention.ms, RabbitMQ 는 max-length 등 관리 필요
신뢰성/오류 처리장애 대응 전략DLQ 구성, 재시도 + 지수 백오프, manual ack 설정Durable Subscription, 재전송 정책, Offset commit 전략DLQ 는 필수 구성 요소, 오류 로깅 및 Alerting 도입 권장
확장성 설계수평 확장 구조큐 단위 분리, 컨슈머 워커 클러스터 구성파티션 기반 구독자 병렬화, 구독자 샤딩 및 부하 분산사용자 그룹별 큐 or 파티션 구성, 클러스터 수준 확장 고려
지연 최소화네트워크/연결 관리연결 풀링, 메시지 배치 처리, 전용 회선 구성로컬 배치 처리, 고속 네트워크 구성, 메시지 필터링 적용네트워크 지연 발생 시 RPC → 비동기 메시징 전환 고려
중복/멱등성 보장Idempotency메시지 해시, Unique ID, 중복 방지 DB Key 적용이벤트 ID 기반 멱등성 로직, 구독자별 deduplication 처리재처리에도 안전한 메시지 구조 설계 (e.g., DB Upsert 패턴)
운영 편의성모니터링 및 Alerting큐 길이, 처리 시간, 실패률 모니터링 + Alert 구성토픽 오프셋 지연, 구독자 상태 모니터링Prometheus + Grafana, Kafka UI, RabbitMQ Management Plugin 활용
  • Queue 는 작업 단위 (Task) 중심 처리에 최적화되어 있고, Topic 은 이벤트 브로드캐스트 처리에 강점을 가진다.
  • **공통적으로 병목 구간 식별, 리소스 모니터링, 자동화된 장애 처리 (DLQ + Alert)**가 중요하다.
  • Kafka, RabbitMQ, SQS/SNS 등 각 브로커에 따라 일부 설정 방식이 다르므로 시스템에 맞는 최적화 전략 수립 필요.
  • 직렬화 포맷과 메시지 크기 최적화는 네트워크/스토리지 부하를 줄이는 데 핵심이다.

실무 적용 예시

도메인/시스템사용 목적메시징 방식적용 방식 예시기대 효과 및 특징
주문 처리 시스템비동기 작업 처리Queue주문 요청 → 큐 (RabbitMQ/SQS) → 백엔드 워커중복 방지, 신뢰성 확보, 부하 분산
알림 시스템실시간 이벤트 브로드캐스트Topic이벤트 발생 → 토픽 (Kafka/PubSub) → 모바일/Web/Slack 등 다중 구독자동시 사용자 대상 브로드캐스트, 반응 속도 향상
로그 수집 및 분석로그 집계 및 전처리Queue서버 로그 → 큐 → 로그 파서/적재기 → DB/S3 등안정적인 수집, 실패 메시지 재처리, 단일 소비 구조
다중 로그 시스템 연동Topic앱 로그 → 토픽 → Kibana + S3 + Data Lake 등다수 분석 시스템에 병렬 전송, 유연한 로그 분기 처리
IoT/센서 데이터대규모 이벤트 스트림 처리Topic센서 이벤트 → 토픽 → 실시간 분석기 + 알림 시스템실시간 통계, 이벤트 기반 경고 시스템 구축 가능
ETL 파이프라인배치/순차 처리QueueData Ingest → 큐 → 변환 작업 순차 수행처리 순서 보장, 실패 처리 용이
실시간 스트리밍 처리Topic실시간 데이터 수집 → 토픽 → 분석기, 대시보드, 저장 시스템 병렬 전달고속 처리, 병렬 확장, 비동기 이벤트 기반 처리
마이크로서비스 통신명령/작업 분산 전달QueueA 서비스 → 큐 → B 서비스 워커로 명령 전달명령 처리의 확장성, 느슨한 결합, 트랜잭션 가능
이벤트 기반 상호 작용Topic도메인 이벤트 → 토픽 발행 → 여러 서비스 구독 (이메일, 포인트, 감사 로그 등)CQRS/Event Sourcing 구현, 기능 확장 용이
데이터 적재/복제멀티 싱크 처리Topic원본 시스템 → 토픽 → 여러 데이터 저장소로 동시에 전송일관된 데이터 적재, 리플레이/재처리 가능
지속 가능성/모니터링장애 대비, 재처리DLQ (보조 구성)처리 실패 메시지 → DLQ → 재시도 큐 또는 관리자 알림 처리신뢰성 향상, 재처리 전략 수립 용이

활용 사례

사례 1: 실시간 알림 시스템 - Topic 패턴

시스템 구성

flowchart TD
    Backend[Backend Service] --> Broker[메시징 브로커]
    Broker --> Topic[알림 토픽]
    Topic --> User1[User App 1]
    Topic --> User2[User App 2]
    Topic --> User3[User App 3]
    Broker -.-> DLQ

Workflow

  1. Backend 에서 알림 이벤트 생성 후 토픽으로 발행
  2. 토픽 구독 중인 모든 User App 에게 동시에 메시지 전송
  3. 개별 앱이 각자 알림 확인, 실패 시 DLQ 보관

주제의 역할: 동일 이벤트의 멀티 소비, 브로드캐스트, 확장성과 실시간성
유무 차이: Topic 없으면 각 대상마다 별도 메시지 송신 필요 → 시스템 과부하, 누락/지연 위험

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Kafka 토픽 방식 알림 발행 및 소비 예시

from kafka import KafkaProducer, KafkaConsumer

# Producer: 알림 이벤트 발행
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('notify-topic', b'{"event":"ALERT","msg":"서버 점검 공지"}')

# Consumer: 알림 구독
consumer = KafkaConsumer(
    'notify-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id='user-app'
)
for msg in consumer:
    print(f"[User App] 수신된 알림: {msg.value.decode()}")
# 여러 User App 인스턴스가 동시에 구독하면 각각 메시지 수신

사례 2: 전자상거래 주문 처리 시스템

시스템 구성:

Queue 기반 주문 처리

graph LR
    UI[웹 인터페이스] --> OQ[주문 큐]
    OQ --> OP1[주문 처리 워커 1]
    OQ --> OP2[주문 처리 워커 2]
    OQ --> OP3[주문 처리 워커 3]
    OP1 --> DB[(주문 DB)]
    OP2 --> DB
    OP3 --> DB

Topic 기반 이벤트 처리

graph TB
    OP[주문 처리 완료] --> ET[이벤트 토픽]
    ET --> IS[재고 서비스]
    ET --> NS[알림 서비스]
    ET --> AS[분석 서비스]
    ET --> BS[빌링 서비스]

Workflow:

  • Queue 워크플로우
    1. 사용자 주문 접수 → 주문 큐에 메시지 추가
    2. 주문 처리 워커들이 경쟁적으로 주문 획득
    3. 주문 검증 및 처리 → 데이터베이스 업데이트
    4. 처리 완료 확인 응답
  • Topic 워크플로우
    1. 주문 처리 완료 → 이벤트 토픽에 발행
    2. 재고 서비스: 재고 차감
    3. 알림 서비스: 고객 알림 발송
    4. 분석 서비스: 판매 데이터 수집
    5. 빌링 서비스: 정산 처리

Queue vs. Topic 유무에 따른 차이점

  • Queue 미사용 시
    • 동기식 처리로 응답 시간 증가
    • 트래픽 급증 시 시스템 과부하
    • 단일 장애점 위험 증가
  • Topic 미사용 시
    • 서비스 간 강한 결합
    • 순차적 처리로 전체 지연 발생
    • 새로운 서비스 추가 시 기존 코드 수정 필요

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
"""
Apache Kafka를 활용한 Queue와 Topic 패턴 구현
- kafka-python 라이브러리 사용
- Queue: 단일 파티션 + 컨슈머 그룹 활용
- Topic: 다중 파티션 + 다중 컨슈머 그룹 활용

설치 필요 라이브러리:
pip install kafka-python
"""

import asyncio
import json
import logging
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, asdict
from datetime import datetime
import uuid
import threading
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError, KafkaError
import signal
import sys

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class KafkaMessage:
    """Kafka 메시지 클래스"""
    id: str
    content: Dict[str, Any]
    timestamp: datetime
    partition: Optional[int] = None
    offset: Optional[int] = None
    key: Optional[str] = None
    
    def to_dict(self) -> Dict:
        """메시지를 딕셔너리로 변환"""
        data = asdict(self)
        data['timestamp'] = self.timestamp.isoformat()
        return data
    
    @classmethod
    def from_kafka_message(cls, msg) -> 'KafkaMessage':
        """Kafka ConsumerRecord에서 KafkaMessage 생성"""
        try:
            content = json.loads(msg.value.decode('utf-8'))
            return cls(
                id=content.get('id', str(uuid.uuid4())),
                content=content.get('content', {}),
                timestamp=datetime.fromisoformat(content.get('timestamp', datetime.now().isoformat())),
                partition=msg.partition,
                offset=msg.offset,
                key=msg.key.decode('utf-8') if msg.key else None
            )
        except Exception as e:
            logger.error(f"메시지 파싱 오류: {e}")
            return cls(
                id=str(uuid.uuid4()),
                content={"raw_value": msg.value.decode('utf-8', errors='ignore')},
                timestamp=datetime.now(),
                partition=msg.partition,
                offset=msg.offset
            )

class KafkaQueueProducer:
    """
    Kafka를 활용한 Queue 패턴 프로듀서
    - 단일 파티션으로 순서 보장
    - Round-robin 방식으로 컨슈머에게 분배
    """
    
    def __init__(self, bootstrap_servers: str = 'localhost:9092'):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None
        self._setup_producer()
    
    def _setup_producer(self):
        """Kafka 프로듀서 설정"""
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                key_serializer=lambda x: x.encode('utf-8') if x else None,
                # 안정성을 위한 설정
                acks='all',  # 모든 replica 확인
                retries=3,   # 재시도 횟수
                enable_idempotence=True,  # 중복 방지
                max_in_flight_requests_per_connection=1  # 순서 보장
            )
            logger.info("Kafka 프로듀서 초기화 완료")
        except Exception as e:
            logger.error(f"Kafka 프로듀서 초기화 실패: {e}")
            raise
    
    async def send_to_queue(self, topic_name: str, content: Dict[str, Any], 
                           key: Optional[str] = None) -> str:
        """
        Queue에 메시지 전송
        Args:
            topic_name: 토픽 이름
            content: 메시지 내용
            key: 파티션 키 (순서 보장용)
        Returns:
            str: 메시지 ID
        """
        message_id = str(uuid.uuid4())
        message_data = {
            'id': message_id,
            'content': content,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            # 비동기 전송
            future = self.producer.send(
                topic_name,
                value=message_data,
                key=key,
                partition=0  # Queue 패턴을 위해 단일 파티션 사용
            )
            
            # 전송 결과 대기
            record_metadata = future.get(timeout=10)
            
            logger.info(f"Queue 메시지 전송 완료 - Topic: {topic_name}, "
                       f"ID: {message_id}, Partition: {record_metadata.partition}, "
                       f"Offset: {record_metadata.offset}")
            
            return message_id
            
        except Exception as e:
            logger.error(f"Queue 메시지 전송 실패 - Topic: {topic_name}, Error: {e}")
            raise
    
    async def send_batch_to_queue(self, topic_name: str, 
                                 messages: List[Dict[str, Any]]) -> List[str]:
        """배치 메시지 전송"""
        message_ids = []
        for content in messages:
            message_id = await self.send_to_queue(topic_name, content)
            message_ids.append(message_id)
        return message_ids
    
    def close(self):
        """프로듀서 종료"""
        if self.producer:
            self.producer.close()
            logger.info("Kafka 프로듀서 종료")

class KafkaQueueConsumer:
    """
    Kafka를 활용한 Queue 패턴 컨슈머
    - 컨슈머 그룹을 통한 메시지 분산 처리
    - 자동 오프셋 커밋으로 중복 처리 방지
    """
    
    def __init__(self, topic_name: str, consumer_group: str, consumer_id: str,
                 bootstrap_servers: str = 'localhost:9092'):
        self.topic_name = topic_name
        self.consumer_group = consumer_group
        self.consumer_id = consumer_id
        self.bootstrap_servers = bootstrap_servers
        self.consumer = None
        self.is_running = False
        self.message_handler: Optional[Callable] = None
        self.processed_count = 0
        self.error_count = 0
        
        self._setup_consumer()
    
    def _setup_consumer(self):
        """Kafka 컨슈머 설정"""
        try:
            self.consumer = KafkaConsumer(
                self.topic_name,
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.consumer_group,
                client_id=self.consumer_id,
                # 자동 오프셋 커밋으로 at-least-once 보장
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                # 가장 오래된 메시지부터 소비
                auto_offset_reset='earliest',
                # 세션 타임아웃 설정
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000,
                # 메시지 크기 제한
                max_partition_fetch_bytes=1048576,  # 1MB
                fetch_max_wait_ms=500
            )
            logger.info(f"Kafka 컨슈머 초기화 완료 - Group: {self.consumer_group}, "
                       f"Consumer: {self.consumer_id}")
        except Exception as e:
            logger.error(f"Kafka 컨슈머 초기화 실패: {e}")
            raise
    
    async def start_consuming(self, message_handler: Callable[[KafkaMessage], bool]):
        """
        메시지 소비 시작
        Args:
            message_handler: 메시지 처리 함수 (KafkaMessage -> bool)
        """
        self.message_handler = message_handler
        self.is_running = True
        
        logger.info(f"Queue 컨슈머 시작 - Consumer: {self.consumer_id}")
        
        try:
            while self.is_running:
                # 메시지 폴링 (타임아웃 1초)
                msg_pack = self.consumer.poll(timeout_ms=1000)
                
                if not msg_pack:
                    continue
                
                # 수신된 메시지 처리
                for topic_partition, messages in msg_pack.items():
                    for msg in messages:
                        try:
                            kafka_message = KafkaMessage.from_kafka_message(msg)
                            
                            logger.info(f"메시지 수신 - Consumer: {self.consumer_id}, "
                                       f"ID: {kafka_message.id}, "
                                       f"Partition: {kafka_message.partition}, "
                                       f"Offset: {kafka_message.offset}")
                            
                            # 메시지 처리
                            success = await self._process_message(kafka_message)
                            
                            if success:
                                self.processed_count += 1
                            else:
                                self.error_count += 1
                                
                        except Exception as e:
                            self.error_count += 1
                            logger.error(f"메시지 처리 오류 - Consumer: {self.consumer_id}, "
                                       f"Error: {e}")
                
        except Exception as e:
            logger.error(f"컨슈머 실행 오류 - Consumer: {self.consumer_id}, Error: {e}")
        finally:
            self.close()
    
    async def _process_message(self, message: KafkaMessage) -> bool:
        """메시지 처리"""
        try:
            if self.message_handler:
                # 동기 함수인 경우 스레드 풀에서 실행
                if asyncio.iscoroutinefunction(self.message_handler):
                    result = await self.message_handler(message)
                else:
                    loop = asyncio.get_event_loop()
                    result = await loop.run_in_executor(None, self.message_handler, message)
                
                logger.info(f"메시지 처리 완료 - Consumer: {self.consumer_id}, "
                           f"Message: {message.id}")
                return result
            return True
            
        except Exception as e:
            logger.error(f"메시지 처리 실패 - Consumer: {self.consumer_id}, "
                        f"Message: {message.id}, Error: {e}")
            return False
    
    def stop(self):
        """컨슈머 정지"""
        self.is_running = False
        logger.info(f"Queue 컨슈머 정지 요청 - Consumer: {self.consumer_id}")
    
    def close(self):
        """컨슈머 종료"""
        if self.consumer:
            self.consumer.close()
            logger.info(f"Kafka 컨슈머 종료 - Consumer: {self.consumer_id}")
    
    def get_stats(self) -> Dict:
        """컨슈머 통계"""
        return {
            "consumer_id": self.consumer_id,
            "consumer_group": self.consumer_group,
            "topic_name": self.topic_name,
            "processed_count": self.processed_count,
            "error_count": self.error_count,
            "is_running": self.is_running
        }

class KafkaTopicProducer:
    """
    Kafka를 활용한 Topic 패턴 프로듀서 (Publish-Subscribe)
    - 다중 파티션으로 확장성 제공
    - 다중 컨슈머 그룹에 메시지 브로드캐스트
    """
    
    def __init__(self, bootstrap_servers: str = 'localhost:9092'):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None
        self._setup_producer()
    
    def _setup_producer(self):
        """Kafka 프로듀서 설정"""
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                key_serializer=lambda x: x.encode('utf-8') if x else None,
                # 처리량 최적화를 위한 설정
                acks=1,  # 리더만 확인 (성능 vs 안정성)
                compression_type='snappy',  # 압축으로 네트워크 효율성
                batch_size=16384,  # 배치 크기
                linger_ms=5,  # 배치 대기 시간
                retries=3
            )
            logger.info("Kafka Topic 프로듀서 초기화 완료")
        except Exception as e:
            logger.error(f"Kafka Topic 프로듀서 초기화 실패: {e}")
            raise
    
    async def publish_to_topic(self, topic_name: str, content: Dict[str, Any],
                              key: Optional[str] = None, 
                              headers: Optional[Dict] = None) -> str:
        """
        Topic에 메시지 발행
        Args:
            topic_name: 토픽 이름
            content: 메시지 내용
            key: 파티션 키
            headers: 메시지 헤더
        Returns:
            str: 메시지 ID
        """
        message_id = str(uuid.uuid4())
        message_data = {
            'id': message_id,
            'content': content,
            'timestamp': datetime.now().isoformat(),
            'headers': headers or {}
        }
        
        try:
            # 헤더 변환 (Kafka는 바이트 헤더만 지원)
            kafka_headers = []
            if headers:
                kafka_headers = [(k, json.dumps(v).encode('utf-8')) 
                               for k, v in headers.items()]
            
            # 비동기 전송 (파티션은 키에 따라 자동 결정)
            future = self.producer.send(
                topic_name,
                value=message_data,
                key=key,
                headers=kafka_headers
            )
            
            # 전송 결과 대기
            record_metadata = future.get(timeout=10)
            
            logger.info(f"Topic 메시지 발행 완료 - Topic: {topic_name}, "
                       f"ID: {message_id}, Partition: {record_metadata.partition}, "
                       f"Offset: {record_metadata.offset}")
            
            return message_id
            
        except Exception as e:
            logger.error(f"Topic 메시지 발행 실패 - Topic: {topic_name}, Error: {e}")
            raise
    
    async def publish_batch_to_topic(self, topic_name: str, 
                                    messages: List[Dict[str, Any]]) -> List[str]:
        """배치 메시지 발행"""
        message_ids = []
        for content in messages:
            message_id = await self.publish_to_topic(topic_name, content)
            message_ids.append(message_id)
        
        # 배치 전송 완료 대기
        self.producer.flush()
        return message_ids
    
    def close(self):
        """프로듀서 종료"""
        if self.producer:
            self.producer.close()
            logger.info("Kafka Topic 프로듀서 종료")

class KafkaTopicSubscriber:
    """
    Kafka를 활용한 Topic 패턴 구독자
    - 독립적인 컨슈머 그룹으로 모든 메시지 수신
    - 다중 파티션에서 병렬 처리
    """
    
    def __init__(self, topic_name: str, subscriber_group: str, subscriber_id: str,
                 bootstrap_servers: str = 'localhost:9092'):
        self.topic_name = topic_name
        self.subscriber_group = subscriber_group
        self.subscriber_id = subscriber_id
        self.bootstrap_servers = bootstrap_servers
        self.consumer = None
        self.is_running = False
        self.message_handler: Optional[Callable] = None
        self.processed_count = 0
        self.error_count = 0
        
        self._setup_consumer()
    
    def _setup_consumer(self):
        """Kafka 컨슈머 설정"""
        try:
            self.consumer = KafkaConsumer(
                self.topic_name,
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.subscriber_group,  # 각 구독자는 별도 그룹
                client_id=self.subscriber_id,
                enable_auto_commit=True,
                auto_commit_interval_ms=1000,
                # 최신 메시지부터 소비 (실시간 스트리밍용)
                auto_offset_reset='latest',
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000,
                # 처리량 최적화
                fetch_min_bytes=1024,
                fetch_max_wait_ms=500,
                max_partition_fetch_bytes=1048576
            )
            logger.info(f"Kafka Topic 구독자 초기화 완료 - Group: {self.subscriber_group}, "
                       f"Subscriber: {self.subscriber_id}")
        except Exception as e:
            logger.error(f"Kafka Topic 구독자 초기화 실패: {e}")
            raise
    
    async def start_subscribing(self, message_handler: Callable[[KafkaMessage], bool]):
        """
        토픽 구독 시작
        Args:
            message_handler: 메시지 처리 함수
        """
        self.message_handler = message_handler
        self.is_running = True
        
        logger.info(f"Topic 구독자 시작 - Subscriber: {self.subscriber_id}")
        
        try:
            while self.is_running:
                msg_pack = self.consumer.poll(timeout_ms=1000)
                
                if not msg_pack:
                    continue
                
                # 수신된 메시지 처리 (다중 파티션)
                for topic_partition, messages in msg_pack.items():
                    for msg in messages:
                        try:
                            kafka_message = KafkaMessage.from_kafka_message(msg)
                            
                            logger.info(f"메시지 수신 - Subscriber: {self.subscriber_id}, "
                                       f"ID: {kafka_message.id}, "
                                       f"Partition: {kafka_message.partition}")
                            
                            # 메시지 처리
                            success = await self._process_message(kafka_message)
                            
                            if success:
                                self.processed_count += 1
                            else:
                                self.error_count += 1
                                
                        except Exception as e:
                            self.error_count += 1
                            logger.error(f"메시지 처리 오류 - Subscriber: {self.subscriber_id}, "
                                       f"Error: {e}")
                
        except Exception as e:
            logger.error(f"구독자 실행 오류 - Subscriber: {self.subscriber_id}, Error: {e}")
        finally:
            self.close()
    
    async def _process_message(self, message: KafkaMessage) -> bool:
        """메시지 처리"""
        try:
            if self.message_handler:
                if asyncio.iscoroutinefunction(self.message_handler):
                    result = await self.message_handler(message)
                else:
                    loop = asyncio.get_event_loop()
                    result = await loop.run_in_executor(None, self.message_handler, message)
                
                logger.info(f"메시지 처리 완료 - Subscriber: {self.subscriber_id}, "
                           f"Message: {message.id}")
                return result
            return True
            
        except Exception as e:
            logger.error(f"메시지 처리 실패 - Subscriber: {self.subscriber_id}, "
                        f"Message: {message.id}, Error: {e}")
            return False
    
    def stop(self):
        """구독자 정지"""
        self.is_running = False
        logger.info(f"Topic 구독자 정지 요청 - Subscriber: {self.subscriber_id}")
    
    def close(self):
        """구독자 종료"""
        if self.consumer:
            self.consumer.close()
            logger.info(f"Kafka Topic 구독자 종료 - Subscriber: {self.subscriber_id}")
    
    def get_stats(self) -> Dict:
        """구독자 통계"""
        return {
            "subscriber_id": self.subscriber_id,
            "subscriber_group": self.subscriber_group,
            "topic_name": self.topic_name,
            "processed_count": self.processed_count,
            "error_count": self.error_count,
            "is_running": self.is_running
        }

class KafkaTopicManager:
    """Kafka 토픽 관리 클래스"""
    
    def __init__(self, bootstrap_servers: str = 'localhost:9092'):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id='topic_manager'
        )
    
    def create_topic(self, topic_name: str, num_partitions: int = 1, 
                    replication_factor: int = 1) -> bool:
        """토픽 생성"""
        try:
            topic = NewTopic(
                name=topic_name,
                num_partitions=num_partitions,
                replication_factor=replication_factor
            )
            
            self.admin_client.create_topics([topic])
            logger.info(f"토픽 생성 완료 - Name: {topic_name}, "
                       f"Partitions: {num_partitions}")
            return True
            
        except TopicAlreadyExistsError:
            logger.info(f"토픽이 이미 존재함 - Name: {topic_name}")
            return True
        except Exception as e:
            logger.error(f"토픽 생성 실패 - Name: {topic_name}, Error: {e}")
            return False
    
    def close(self):
        """어드민 클라이언트 종료"""
        self.admin_client.close()

# 메시지 핸들러 예시들
async def order_queue_handler(message: KafkaMessage) -> bool:
    """주문 큐 처리 핸들러"""
    try:
        order_data = message.content
        order_id = order_data.get('order_id')
        customer_id = order_data.get('customer_id')
        amount = order_data.get('amount')
        
        logger.info(f"[주문 처리] Order: {order_id}, Customer: {customer_id}, "
                   f"Amount: {amount}")
        
        # 주문 처리 시뮬레이션
        await asyncio.sleep(0.1)
        return True
        
    except Exception as e:
        logger.error(f"주문 처리 실패: {e}")
        return False

async def inventory_topic_handler(message: KafkaMessage) -> bool:
    """재고 토픽 구독 핸들러"""
    try:
        event_data = message.content
        if event_data.get('event_type') == 'order_completed':
            order_id = event_data.get('order_id')
            items = event_data.get('items', [])
            
            logger.info(f"[재고 서비스] 주문 {order_id}에 대한 재고 차감: {items}")
            await asyncio.sleep(0.05)
        
        return True
        
    except Exception as e:
        logger.error(f"재고 처리 실패: {e}")
        return False

async def notification_topic_handler(message: KafkaMessage) -> bool:
    """알림 토픽 구독 핸들러"""
    try:
        event_data = message.content
        if event_data.get('event_type') == 'order_completed':
            customer_id = event_data.get('customer_id')
            order_id = event_data.get('order_id')
            
            logger.info(f"[알림 서비스] 고객 {customer_id}에게 주문 {order_id} 완료 알림 발송")
            await asyncio.sleep(0.02)
        
        return True
        
    except Exception as e:
        logger.error(f"알림 처리 실패: {e}")
        return False

# 사용 예시
async def kafka_queue_example():
    """Kafka Queue 패턴 예시"""
    topic_manager = KafkaTopicManager()
    
    # 큐용 토픽 생성 (단일 파티션)
    queue_topic = "order_processing_queue"
    topic_manager.create_topic(queue_topic, num_partitions=1)
    
    # 프로듀서 생성
    producer = KafkaQueueProducer()
    
    # 컨슈머 생성 (같은 그룹 - 작업 분산)
    consumer_group = "order_workers"
    consumer1 = KafkaQueueConsumer(queue_topic, consumer_group, "worker_1")
    consumer2 = KafkaQueueConsumer(queue_topic, consumer_group, "worker_2")
    consumer3 = KafkaQueueConsumer(queue_topic, consumer_group, "worker_3")
    
    try:
        # 컨슈머 시작 (백그라운드)
        consumer_tasks = [
            asyncio.create_task(consumer1.start_consuming(order_queue_handler)),
            asyncio.create_task(consumer2.start_consuming(order_queue_handler)),
            asyncio.create_task(consumer3.start_consuming(order_queue_handler))
        ]
        
        logger.info("=== Kafka Queue 패턴 시작 ===")
        
        # 주문 메시지 전송
        orders = [
            {
                "order_id": f"ORD-{i:03d}",
                "customer_id": f"CUST-{i%5:03d}",
                "amount": 1000 + i*100,
                "items": [f"item-{i}"]
            }
            for i in range(1, 11)
        ]
        
        await producer.send_batch_to_queue(queue_topic, orders)
        
        # 처리 완료 대기
        await asyncio.sleep(5)
        
        # 통계 출력
        logger.info("=== Queue 처리 결과 ===")
        for consumer in [consumer1, consumer2, consumer3]:
            stats = consumer.get_stats()
            logger.info(f"Consumer {stats['consumer_id']}: "
                       f"처리됨={stats['processed_count']}, "
                       f"에러={stats['error_count']}")
        
    finally:
        # 정리
        consumer1.stop()
        consumer2.stop()
        consumer3.stop()
        
        for task in consumer_tasks:
            task.cancel()
        
        producer.close()
        topic_manager.close()

async def kafka_topic_example():
    """Kafka Topic 패턴 예시"""
    topic_manager = KafkaTopicManager()
    
    # 토픽용 토픽 생성 (다중 파티션)
    event_topic = "order_events"
    topic_manager.create_topic(event_topic, num_partitions=3)
    
    # 프로듀서 생성
    producer = KafkaTopicProducer()
    
    # 구독자 생성 (각기 다른 그룹 - 모든 메시지 수신)
    inventory_subscriber = KafkaTopicSubscriber(
        event_topic, "inventory_service", "inventory_1"
    )
    notification_subscriber = KafkaTopicSubscriber(
        event_topic, "notification_service", "notification_1"
    )
    
    try:
        # 구독자 시작 (백그라운드)
        subscriber_tasks = [
            asyncio.create_task(
                inventory_subscriber.start_subscribing(inventory_topic_handler)
            ),
            asyncio.create_task(
                notification_subscriber.start_subscribing(notification_topic_handler)
            )
        ]
        
        logger.info("=== Kafka Topic 패턴 시작 ===")
        
        # 잠시 대기 (구독자 준비 시간)
        await asyncio.sleep(2)
        
        # 이벤트 메시지 발행
        events = [
            {
                "event_type": "order_completed",
                "order_id": f"ORD-{i:03d}",
                "customer_id": f"CUST-{i%3:03d}",
                "amount": 1500 + i*200,
                "items": [f"item-{i}", f"item-{i+5}"]
            }
            for i in range(1, 6)
        ]
        
        await producer.publish_batch_to_topic(event_topic, events)
        
        # 처리 완료 대기
        await asyncio.sleep(5)
        
        # 통계 출력
        logger.info("=== Topic 처리 결과 ===")
        for subscriber in [inventory_subscriber, notification_subscriber]:
            stats = subscriber.get_stats()
            logger.info(f"Subscriber {stats['subscriber_id']}: "
                       f"처리됨={stats['processed_count']}, "
                       f"에러={stats['error_count']}")
        
    finally:
        # 정리
        inventory_subscriber.stop()
        notification_subscriber.stop()
        
        for task in subscriber_tasks:
            task.cancel()
        
        producer.close()
        topic_manager.close()

# 메인 실행 함수
async def main():
    """메인 실행 함수"""
    
    def signal_handler(signum, frame):
        logger.info("프로그램 종료 신호 받음")
        sys.exit(0)
    
    # 시그널 핸들러 등록
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    try:
        logger.info("Kafka 예시 시작")
        
        # Queue 패턴 예시 실행
        await kafka_queue_example()
        
        await asyncio.sleep(2)
        
        # Topic 패턴 예시 실행
        await kafka_topic_example()
        
        logger.info("Kafka 예시 완료")
        
    except Exception as e:
        logger.error(f"실행 중 오류 발생: {e}")
    
    logger.info("프로그램 종료")

if __name__ == "__main__":
    asyncio.run(main())

사례 3: 주문 처리 시스템

시스템 구성:

구성 요소설명
Queueorder_queue: 작업 (Task) 전용. FIFO 처리 보장.
Exchangeevents: fanout 타입. 모든 구독자에게 메시지를 broadcast.
Producer주문 생성 시스템. Queue 에 작업 메시지 전송 또는 이벤트 발행.
Consumer주문 처리 시스템. Queue 에서 작업 수신 후 처리.
Subscriber이벤트 처리 시스템들. 이벤트를 받아 로그 기록, 알림 등 수행.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
┌───────────────┐
│  Order Client │
└──────┬────────┘
┌────────────────────┐
│   RabbitMQ Broker  │
│ ┌───────────────┐ │
│ │ Queue: order  │ │◄───── Consumer (Order Processor)
│ └───────────────┘ │
│ ┌───────────────┐ │
│ │ Exchange:     │ │
│ │   events(fan) │ │◄───── Multiple Subscribers
│ └───────────────┘ │
└────────────────────┘
  • Queue 방식 (Point-to-Point):
    • order_queue 라는 이름의 큐에 메시지를 하나의 소비자가 처리.
    • FIFO 기반으로 작업 (Task) 처리가 적합.
  • Topic 방식 (Fanout Exchange 사용):
    • events 라는 fanout exchange 를 통해 여러 소비자에게 동시에 브로드캐스트.
    • 이벤트 알림/상태 변경 전파에 적합.

메시지 흐름 (Workflow):

  • Queue 기반 (작업 처리용)
    1. Producer (producer_queue.py):
      • order_queue"order_id": 12345, "status": "NEW" 메시지 전송.
    2. Consumer (consumer_queue.py):
      • 큐에 메시지가 도착하면 한 명의 소비자가 처리.
      • 메시지는 큐에서 제거됨.
  • Topic 기반 (이벤트 브로드캐스트용)
    1. Producer (producer_topic.py):
      • events exchange 에 "event": "ORDER_CREATED" 메시지 전송.
      • fanout 타입으로, 라우팅 키 없이 모든 바인딩된 큐에 전송.
    2. Subscriber (subscriber_topic.py):
      • 빈 queue 선언 후 events exchange 에 바인딩.
      • 동일한 이벤트를 여러 구독자들이 동시에 수신 가능.

구현 예시:

  1. Queue 예시 (Point-to-Point)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    # producer_queue.py
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='order_queue')
    
    # 주문 메시지 발행
    channel.basic_publish(exchange='',
                          routing_key='order_queue',
                          body='{"order_id": 12345, "status": "NEW"}')
    
    print("Order sent to Queue")
    connection.close()
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    # consumer_queue.py
    import pika
    
    def callback(ch, method, properties, body):
        print(f"[x] Received Order: {body}")
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='order_queue')
    channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=True)
    
    print("Waiting for orders…")
    channel.start_consuming()
    
  2. Topic 예시 (Publish-Subscribe)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    # producer_topic.py
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Topic exchange 선언
    channel.exchange_declare(exchange='events', exchange_type='fanout')
    
    # 이벤트 브로드캐스트
    channel.basic_publish(exchange='events',
                          routing_key='',
                          body='{"event": "ORDER_CREATED"}')
    print("Event broadcasted")
    connection.close()
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    # subscriber_topic.py
    import pika
    
    def callback(ch, method, properties, body):
        print(f"[x] Received Event: {body}")
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 이벤트 exchange 선언
    channel.exchange_declare(exchange='events', exchange_type='fanout')
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    
    # 이벤트 구독
    channel.queue_bind(exchange='events', queue=queue_name)
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    print("Waiting for events…")
    channel.start_consuming()
    

사례 4: 사용자 알림 시스템 (Kafka Topic 기반)

앱에서 사용자 이벤트가 발생했을 때, 여러 컴포넌트가 이를 실시간으로 반응해야 하는 구조 구성

시스템 구성:

  • Producer: 사용자 앱에서 행동 발생 시 이벤트 발행
  • Topic: Kafka Topic–user-events
  • Consumers:
    • Notification Service (푸시알림)
    • Logging Service (감사 기록)
    • Analytics Engine (실시간 통계)
flowchart TD
  App[Mobile App] --> KafkaTopic["Kafka Topic: user-events"]
  KafkaTopic --> NotiService[Notification Service]
  KafkaTopic --> LogService[Logging Service]
  KafkaTopic --> AnalyticsEngine[Analytics Engine]

Workflow:

  1. 사용자가 앱에서 특정 행동을 수행함
  2. 이벤트가 Kafka Topic 에 발행됨
  3. 각각의 서비스가 독립적으로 해당 이벤트를 수신하고 자체 처리 수행

Kafka Topic 유무에 따른 차이:

구분존재할 경우존재하지 않을 경우
메시지 처리 구조느슨한 결합, 병렬 분기강결합, 동기 호출 중심
확장성독립적 확장 용이하나의 호출 체인에 의존
유지보수각 서비스 단위 테스트 용이전체 시스템 수정 필요

주목할 내용

카테고리항목설명관련 기술 / 시스템
전달 방식Push vs PullQueue 는 Pull 기반 소비, Topic 은 Push 또는 Hybrid 방식 가능.Kafka (Pull), SNS (Push), MQTT
확장성 설계Partitioning메시지를 물리적으로 분할하여 트래픽을 분산하고 병렬 처리를 통해 성능을 향상시킴.Kafka Partition, SQS Shard
Consumer ScalingConsumer Group 을 통한 병렬 소비 확장. Partition 수와 균형 있게 설계되어야 함.Kafka Consumer Group, Celery Workers
신뢰성/무결성Acknowledgment (Ack)메시지 수신 확인을 통해 손실 방지 및 재전송 제어.Kafka Offset Commit, RabbitMQ Ack
Dead Letter Queue처리 실패 메시지를 보관하여 재처리 가능하도록 함. 장애 분석 및 경고 트리거로도 활용 가능.Kafka DLQ, RabbitMQ DLX
Idempotency중복된 메시지라도 같은 결과를 보장하는 소비자 처리 로직 설계. 중복 이벤트 발생 시 데이터 무결성 유지.Webhook, Event Consumer
처리 전략Offset & Replay메시지의 소비 위치를 관리하여 리플레이, 복기, 재처리 등 고급 처리 전략을 구현 가능.Kafka Offset, Pulsar Cursor
TTL (Time-To-Live)메시지의 생명 주기를 제한하여 오랫동안 소비되지 않는 메시지를 자동으로 정리.SQS Message TTL, Kafka Retention
라우팅/필터링Message RoutingTopic/Exchange 기반의 메시지 라우팅 설계. Routing Key 와 Binding Rule 로 목적지 결정.RabbitMQ Exchange, SNS Topic Filter
Attribute Filtering메시지 속성 기반으로 구독자가 원하는 조건만 필터링 수신.SNS Filter Policy, JMS Selectors
패턴/조합CQRS + Pub/SubCQRS(Command/Query 분리) 구조와 Topic 기반 메시징 결합으로 읽기/쓰기 모델 최적화.Event Sourcing with Kafka
성능 최적화Batch & Compression메시지 묶음 전송과 압축 적용으로 Throughput 향상 및 전송 비용 절감.Kafka Batch Size, GZIP Compression
전송 품질 보장QoS Level메시지 전송 품질 설정: At Most Once / At Least Once / Exactly OnceMQTT QoS, Kafka Delivery Semantics
장애 회복Backpressure Handling소비자가 처리 속도를 초과할 경우, 흐름 제어 및 재시도/대기 메커니즘으로 안정화 필요.Reactive Streams, Kafka Lag Monitor
  • Queue 중심 시스템은 작업 처리 (Task Queue) 용도에 최적이며, Pull 기반, 1:1 처리, Ack/DLQ 중심의 신뢰성 보장 구조.
  • Topic 중심 시스템은 이벤트 브로드캐스트다수 소비자 처리에 유리하며, Push 기반 + 필터링 + 확장성 중심 설계가 필요.
  • Kafka 와 RabbitMQ 는 각각의 특징을 극대화하며, 다양한 **복합 아키텍처 (CQRS, Event Sourcing, Retry Workflow)**에서 병행 사용됨.
  • 실제 아키텍처 설계 시 성능, 확장성, 무결성, 복원력, 필터링, 그리고 QoS 정책까지 종합적으로 고려해야 함.

반드시 학습해야할 내용

카테고리주제핵심 개념 / 항목설명 및 실무 적용 포인트
메시징 모델Queue vs. TopicP2P (1:1), Pub/Sub (1:N)Queue 는 단일 소비자 처리에 적합, Topic 은 다수 구독자에게 브로드캐스트 용도
메시징 프로토콜AMQP, MQTTAMQP = 복잡한 라우팅, MQTT = 경량 통신RabbitMQ → AMQP 기반 / IoT 및 저전력 → MQTT 사용 (예: 센서 데이터)
메시지 보장QoS (At-Least-Once 등)At-Most-Once / At-Least-Once / Exactly-Once메시지 유실/중복 방지 및 시스템 신뢰성 확보. Kafka 는 At-least-once 기본, Exactly-once 지원 필요 시 구성 필요
신뢰성/순서Ack, Offset, Delivery SemanticsAuto/Manual Ack, Kafka Offset메시지 누락 방지, 재처리 가능성 확보. Ack 와 Offset 을 통해 중복 처리 방지 및 순서 보장
확장성 설계PartitioningKafka, RabbitMQ 의 파티션 설계병렬 소비 및 부하 분산을 위한 핵심 요소. 파티션 수 결정 → Throughput 결정됨
에러 처리DLQ (Dead Letter Queue)처리 실패 메시지 격리재처리, 로깅, 경고를 위한 DLQ 구성 필수. Kafka, RabbitMQ, AWS SQS 모두 DLQ 전략 존재
백프레셔BackpressureKafka max.poll.records, RabbitMQ prefetch소비자 처리 속도 제어 및 생산자 과부하 방지. 처리 병목 구간 진단 및 트래픽 제어
데이터 포맷Avro, Protobuf, JSON직렬화 포맷 선택Avro/Protobuf → 이진 포맷, 속도/용량 우수. JSON → 디버깅 용이하나 느리고 크다
멱등성Idempotency메시지 재처리 안전성같은 메시지 중복 수신 시 결과 일관성 보장. Redis, DB transaction key 활용 등으로 보장 가능
아키텍처 설계Event-Driven ArchitectureLoose Coupling, Scalability, Observability서비스 간 직접 연결 지양. Kafka/NATS 등으로 비동기 이벤트 중심 아키텍처 설계 시 고려해야 할 요소
클라우드 서비스AWS SQS/SNS, Azure, GCP PubSubServerless 메시징 서비스 비교SQS(Queue), SNS(Topic), EventBridge(EDA 연동) → 비용, 확장성, TTL, DLQ, 멱등성 기능 비교 필요
실시간 처리Kafka Streams, Flink메시징 + 실시간 데이터 흐름 통합Kafka Streams → Kafka-native 스트림 처리. Flink → 고급 상태 관리, CEP 등 포함 실시간 분석
DevOps 연계Kubernetes + Kafka / KEDA메시지 기반 오토스케일링메시지 수 (큐 길이) 기반 Horizontal Pod Autoscaler 구현. KEDA + Kafka/RabbitMQ 연동 가능
보안인증, 암호화, 접근제어메시지 암호화, IAM 인증TLS 전송 암호화, JWT 또는 IAM 기반 접근 제어. Kafka 는 ACL 구성, SQS 는 IAM 정책 설정 필수

용어 정리

카테고리용어설명관련 기술/모델
전달 방식Queue (큐)Point-to-Point 기반 구조. 단일 소비자에게 메시지를 순서대로 (FIFO) 전달.RabbitMQ, SQS, Celery
Topic (토픽)Publish-Subscribe 기반 구조. 다수 소비자가 같은 메시지를 수신.Kafka, Pub/Sub, SNS
Exchange메시지를 큐 또는 토픽으로 라우팅하는 라우터 역할. RabbitMQ 의 핵심 컴포넌트.RabbitMQ (Direct/Fanout/Topic/Header)
Virtual Topic큐와 토픽을 조합한 패턴. Pub/Sub 구조 위에 큐 기반 소비자를 추가하여 병렬성 제공.ActiveMQ
처리 전략FIFOFirst-In-First-Out. 메시지를 받은 순서대로 소비자에게 전달.대부분의 큐 시스템에서 기본값
Round-Robin여러 소비자에게 순환 방식으로 메시지를 분배.RabbitMQ Consumer Load Balancing
Fan-out메시지를 여러 수신자에게 복제하여 전달하는 방식.SNS → SQS, Kafka
Partition (파티션)하나의 토픽 또는 큐를 병렬 처리 가능한 단위로 나눈 구조.Kafka Partition
Consumer Group동일 토픽을 병렬로 처리하기 위한 소비자 집합 단위. 각 파티션은 하나의 그룹 내 소비자에게만 전달됨.Kafka
신뢰성 보장Acknowledgment (Ack)메시지 수신/처리 완료를 브로커에 알림으로써 메시지 삭제 또는 재전송 제어.Kafka, RabbitMQ
Idempotency동일 메시지에 대한 중복 처리 방지. 재처리 시 결과가 동일하도록 설계.Kafka Consumer, Webhook 처리 등
Durable Subscription구독자가 오프라인이어도 메시지를 보존하여 수신 가능하도록 하는 옵션.JMS, MQTT
에러 처리DLQ (Dead Letter Queue)처리 실패 메시지를 별도로 저장하여 재처리하거나 로그 확인을 가능하게 하는 큐.RabbitMQ, Kafka, SQS
Backpressure소비자가 처리 가능한 속도보다 빠르게 데이터가 도착할 경우 이를 제어하는 메커니즘.Reactive Streams, Kafka
성능 메트릭Throughput단위 시간당 처리 가능한 메시지 수.Kafka: TPS, RabbitMQ: msg/sec
Latency메시지 생성부터 소비까지 걸리는 시간.전송 시간 + 처리 시간 포함
QoSQoS Level메시지 전달 보장 수준: At-most-once / At-least-once / Exactly-onceMQTT, Kafka
중개자 구성Broker (브로커)메시지를 생산자에서 소비자에게 중계하는 핵심 구성 요소.Kafka, RabbitMQ, Pulsar
Offset소비자의 메시지 읽기 위치를 나타내는 지표. 재처리 또는 순서 보장에 활용.Kafka, Pulsar

참고 및 출처