Event-Driven Architecture

Event‑Driven Architecture(EDA) 는 시스템 내 이벤트 (예: 주문 생성, 결제 완료) 가 브로커/채널을 통해 퍼블리시되고, 이를 구독한 다양한 서비스들이 독립적으로 반응하는 패턴이다. 이 구조는 Pub/Sub 기반 메시지 전송, 이벤트 라우팅, 비동기 처리, 이벤트 저장소, Eventual Consistency 등을 활용해 서비스 간 느슨한 결합과 독립적 확장성을 확보한다. 또한, 복잡한 업무 흐름 (Orchestration vs Choreography) 및 CQRS/Event Sourcing 패턴도 지원하며, 이기종 시스템 통합, IoT, 마이크로서비스, 실시간 분석 등에서 널리 사용된다.

배경

Event-Driven Architecture 는 전통적인 모놀리식 아키텍처의 한계를 극복하기 위해 등장했다. 2000 년대 초반 SOA(Service-Oriented Architecture) 의 발전과 함께 비동기 메시징의 중요성이 대두되었고, 클라우드 컴퓨팅과 마이크로서비스 아키텍처의 확산으로 EDA 가 주목받게 되었다.

역사적 발전:

목적 및 필요성

목적

필요성

핵심 개념

이벤트 기반 아키텍처 (EDA) 는 시스템 내에서 발생하는 이벤트 (상태 변화, 액션 등) 를 중심으로 서비스가 동작하며, 이벤트 생산자 (Producer) 와 소비자 (Consumer) 가 메시지 브로커 (Broker) 를 통해 비동기적으로 통신하는 소프트웨어 설계 패턴이다.

카테고리용어/개념설명
기본 구성 요소Event (이벤트)시스템 내 상태 변화나 사건을 표현하는 불변 메시지 객체. 예: 주문 생성, 결제 완료 등
Event Producer / Publisher이벤트를 생성하고 브로커에 발행하는 컴포넌트. 생산자는 소비자의 존재를 모름
Event Consumer / Subscriber브로커로부터 이벤트를 구독하고 처리하는 컴포넌트
Event Broker / Bus / Channel발행자와 구독자 사이에서 이벤트를 라우팅, 전달, 저장하는 미들웨어 (Kafka, RabbitMQ 등)
Topic / Stream / Queue이벤트의 분류 단위로, 동일한 유형의 이벤트 흐름을 논리적으로 그룹핑
통신 및 구조 원칙Asynchronous Communication이벤트 발행 시 소비자의 응답을 기다리지 않고 독립적으로 처리하는 방식 (비동기 통신)
Loose Coupling (느슨한 결합)컴포넌트 간 직접 의존성을 제거하여 독립적인 배포 및 확장 가능하게 설계
Pub/Sub Pattern이벤트 발행자와 구독자 간 관계를 분리하여 확장성과 유연성 확보
Event Choreography개별 서비스들이 이벤트를 통해 독립적으로 반응하며 전체 워크플로우를 구성 (탈중앙화)
Event Orchestration중앙 오케스트레이터가 이벤트 흐름과 작업 순서를 제어 (중앙화 방식)
일관성 및 상태 관리Eventual Consistency비동기 처리 환경에서 시간이 지나면 전체 시스템이 일관된 상태에 도달하는 특성
Deferred Execution요청 시점과 처리 시점이 분리되어 실행되는 모델
아키텍처 확장 요소Event Sourcing모든 상태 변경을 이벤트로 저장하고 재생을 통해 시스템 상태를 재현하는 패턴
CQRS (Command Query Responsibility Segregation)명령 (쓰기) 과 조회 (읽기) 를 분리하여 성능 및 확장성을 향상시키는 아키텍처 패턴
Event Mesh다수의 이벤트 브로커들을 연결하여 메시지를 전파하는 분산 메시징 네트워크
Hierarchical Topic계층적 구조의 토픽 네이밍을 통해 메시지 필터링과 라우팅을 정교하게 구성 가능
연관 도구 및 메커니즘Schema Registry이벤트 메시지의 스키마를 등록 및 관리하여 형식 일관성과 호환성 유지
Correlation ID이벤트 흐름을 추적하기 위한 고유 식별자. 분산 트레이싱에 사용됨
Dead Letter Queue (DLQ)소비 실패 또는 오류가 발생한 이벤트를 별도로 저장하여 사후 처리 가능하게 함

실무 연관성

카테고리항목설명
1. 적용 분야마이크로서비스 통신이벤트 기반으로 서비스 간 결합도를 낮추고 독립적 배포·확장이 가능하게 함
실시간 데이터 처리실시간 이벤트 스트리밍 기반 분석, 대기시간 없는 사용자 응답 시스템 구현
시스템 통합레거시 시스템과 신규 시스템 간 통신을 이벤트로 구성하여 유연한 연계 구현
장애 복구 및 내결함성이벤트 저장 및 재처리 메커니즘으로 장애 복구 및 데이터 유실 방지 가능
분산 시스템 확장서비스별로 독립적으로 수평 확장 가능, 트래픽 급증에도 유연하게 대응
워크플로우 오케스트레이션Choreography, Saga 패턴을 통해 서비스 간 분산 트랜잭션 관리
2. 구현 요소이벤트 브로커 / 메시지 큐Kafka, RabbitMQ, Amazon EventBridge, Pulsar, Azure Service Bus 등
스키마 관리 및 계약 문서화Confluent Schema Registry, AsyncAPI, 이벤트 포맷 (JSON, Avro 등)
이벤트 저장 및 재생Kafka log-compaction, Event Store, Outbox Pattern 활용
모니터링 및 트레이싱Prometheus, Grafana, Jaeger, OpenTelemetry, ELK Stack
보안 및 인증/인가TLS, OAuth2, JWT, Kafka ACL, IAM 기반 Role 제어
3. 아키텍처 설계 고려사항비동기 이벤트 라우팅토픽 기반 필터링, 와일드카드 구독, 토픽 계층화로 관심사 기반 메시지 수신
이벤트 내구성 및 정합성메시지 전달 보장 설정 (At-least/Exactly-once), 중복 방지 (Idempotency)
버전 관리 및 진화 가능성이벤트 메시지 스키마의 버전 호환성 설계, 호환되지 않는 변경 시 대비
분산 트랜잭션 관리이벤트 기반으로 일관성을 유지하기 위한 SAGA 패턴, 보상 트랜잭션 설계
4. 운영 및 유지보수확장성 및 유연성이벤트 기반 구조는 새로운 서비스 추가/변경 시 기존 구성 요소에 영향 최소화
장애 격리 및 복원력이벤트 기반 비동기 구조는 서비스 간 장애 전파를 막고 자동 복구 구조 구현 가능
계측 및 알림 설정이벤트 흐름, DLQ 비율, 실패율 등을 기반으로 한 알림 및 자동 대응
다양한 프로토콜/플랫폼 연계HTTP, MQTT, gRPC 등 다양한 통신 프로토콜과 연동 가능
테스트 및 시뮬레이션프로듀서/컨슈머 분리 테스트 가능, Mock Broker 를 통한 통합 테스트 유연성 확보

주요 기능 및 역할

주요 기능

  1. Event Publishing (이벤트 발행): 비즈니스 이벤트 생성 및 전파
  2. Event Routing (이벤트 라우팅): 관심 있는 소비자에게 이벤트 전달
  3. Event Processing (이벤트 처리): 수신된 이벤트에 대한 비즈니스 로직 실행
  4. Event Storage (이벤트 저장): 이벤트 이력 및 재처리를 위한 저장

역할

특징

핵심 원칙

  1. 단일 책임 원칙: 각 이벤트는 하나의 명확한 목적을 가짐
  2. 이벤트 불변성: 발행된 이벤트는 변경되지 않음
  3. 멱등성: 동일한 이벤트의 반복 처리가 안전해야 함
  4. 순서 보장: 필요 시 이벤트 처리 순서 보장
  5. 장애 격리: 한 구성 요소의 장애가 전체 시스템에 영향을 주지 않음
  6. 결과적 일관성: 모든 시스템이 최종적으로 일관된 상태를 유지

주요 원리

이벤트 기반 통신

graph TD
    A[Event Producer] -->|Publish Event| B[Event Broker]
    B -->|Route Event| C[Event Consumer 1]
    B -->|Route Event| D[Event Consumer 2]
    B -->|Route Event| E[Event Consumer 3]
    
    subgraph "Event Processing"
        F[Event Store] --> G[Event Stream]
        G --> H[Event Handler]
        H --> I[Business Logic]
    end
    
    B --> F

작동 원리

sequenceDiagram
  participant P as Producer
  participant B as Broker
  participant C as Consumer
  P->>B: Emit "OrderCreated"
  B->>C: Notify event
  C->>C: Process order
  alt Processing error
    C->>B: Send to DLQ
  end
  note over B,C: 메시징 내구성 & 중복 방지

설명:

  1. Producer 가 이벤트를 브로커에 발행
  2. Broker 는 이벤트 저장 및 메시징
  3. Consumer 가 이벤트를 수신 후 처리
  4. 오류 발생 시 DLQ 또는 재시도 처리

구조 및 아키텍처

flowchart TD
  subgraph Producer Layer
    P[Online Service, IoT Device, DB Trigger]
  end
  subgraph Broker Layer
    B[Event Broker: Kafka / RabbitMQ / EventBridge]
    Topic[Topic / Channel]
  end
  subgraph Consumer Layer
    C1["Service A (Payment)"]
    C2["Service B (Inventory)"]
    C3["Service C (Notification)"]
  end
  subgraph Infrastructure
    Store[Event Store / Audit Log]
    Orchestrator[Saga / Orchestration]
    Monitor[Monitoring & Tracing]
  end

  P --> B
  B --> |topic based| C1
  B --> |topic based| C2
  B --> |topic based| C3
  C1 --> B
  C2 --> B
  C3 --> B
  B --> Store
  Orchestrator --> B
  Monitor --> B

구성 요소

구분구성 요소기능/역할필수/선택
이벤트 브로커Kafka, RabbitMQ, AWS EventBridge 등이벤트 게시/구독, 라우팅, 내구성 보장필수
프로듀서애플리케이션, DB 트리거, IoT 디바이스 등이벤트 생성 및 게시필수
컨슈머처리 서비스 (확장 가능)이벤트 수신 및 처리 로직 수행필수
이벤트 저장소Event Store, Kafka log-compacted영속적 이력 저장, 재생 기능 제공선택
오케스트레이션Saga Coordinator, Orchestrator 서비스분산 트랜잭션 및 워크플로우 관리선택
모니터링 · 로깅Jaeger, Prometheus, ELK, OpenTelemetry이벤트 흐름 가시화 및 진단선택
메시지 포맷/계약JSON, Avro, Protobuf; Schema Registry이벤트 구조 정의 및 변경 관리권장

구현 기법

카테고리구현 기법정의 및 구성 요소목적 및 활용 목표대표 예시 시나리오
1. 메시지 전달 구조Pub/Sub (발행/구독)Producer → Topic → 다수 ConsumerMessage Broker (Kafka, RabbitMQ 등)느슨한 결합, 브로드캐스트, 동적 구독 지원주문 생성 시 알림, 재고 차감, 분석 시스템에 동시 전달
Point-to-Point (Queue)Producer → Queue → 단일 Consumer (or competing group)일대일 처리, 워크 분산작업 큐 기반 주문 처리 시스템
Message Broker 기반 모델Producer → Exchange → Queue → Consumer (RabbitMQ 등)라우팅, 필터링, 일관된 메시지 흐름 구현Direct/Topic 기반 라우팅, DLQ 구성
2. 상태 저장 및 동기화Event Sourcing상태 변경을 이벤트로 기록, Event Store, Aggregate, Replay 등 구성감사 추적, 시점 복구, 영속성 확보금융 거래, 계좌 잔액 계산, 로그 재처리 시스템
CQRS명령 (Command) 과 조회 (Query) 모델 분리이벤트 버스/리드 모델 분리성능 향상, 확장성, 조회 최적화주문 서비스에서 Command: 생성/취소, Query: 리포트 조회
Outbox PatternDB 트랜잭션과 이벤트 발행 분리 Outbox 테이블 → 메시지 브로커 전송DB 정합성 보장, 재처리 용이성주문 저장 후 이벤트 발행을 Outbox → Kafka 로 비동기 전송
CDC (Change Data Capture)DB 변경 감지 후 이벤트 생성 (Debezium 등 활용)기존 DB 기반 시스템에 이벤트 도입RDB 변경 사항을 Kafka 이벤트로 생성하여 실시간 처리
3. 워크플로우 처리 패턴Saga Pattern분산 트랜잭션 처리: Orchestrator or Choreography + Compensating Transaction분산 환경에서 일관성 보장주문 → 결제 → 배송 단계 실패 시 보상 로직으로 롤백
SEDA이벤트 처리 단계별 Queue 로 분리, 비동기 처리 및 리소스 분산병렬성 향상, 부하 제어, 스로틀링 적용 가능대규모 트래픽 처리에서 처리량 유지를 위한 스테이지 분리
Complex Event Processing패턴 매칭, 시계열 분석, 복합 이벤트 탐지 엔진 (Flink, Esper 등)실시간 인사이트, 복합 조건 처리이상 행동 탐지, 실시간 Fraud Detection
4. 배포 구조Broker Topology중재자 없이 브로커 ↔ 프로세서 간 직접 연결 (Chain 구조)단순 구조, 고성능, 직렬화 흐름 처리주문 생성 → 처리 → 알림 프로세서 순차 처리
Mediator Topology중앙 Orchestrator 가 모든 이벤트 흐름 조정워크플로우 기반 이벤트 제어명시적 제어, 보상 트랜잭션, 장애 관리중앙 이벤트 허브가 각 서비스에 이벤트 순서대로 전파
Event Mesh지역/클라우드 간 브로커 네트워크 (Solace, Kafka Connect 등)글로벌 이벤트 중계, 이기종 시스템 연동AWS ↔ Kafka ↔ 엣지 디바이스 연계
5. 클라우드 기반 처리Serverless Event HandlingSNS → SQS → Lambda (or Event Grid + Function)인프라 관리 최소화, 비용 효율성사용자가 이벤트 발생 시 자동 알림 또는 비즈니스 로직 실행
Event Streaming PlatformKafka/Pulsar 기반 실시간 이벤트 스트림 처리 Partition, Consumer Group 등 구성고성능, 고가용성, 재생 가능클릭 로그, 사용자 행동 분석, 실시간 추천 시스템

Publish-Subscribe 패턴

정의: 발행자가 구독자에게 직접 메시지를 전송하지 않고 중간 브로커를 통해 전달하는 기법

구성:

목적: 느슨한 결합, 동적 구독, 확장성

실제 예시:

1
2
3
4
뉴스 구독 시스템:
- Publisher: 뉴스 에디터
- Topic: "스포츠", "정치", "경제"
- Subscriber: 모바일 앱, 웹사이트, 이메일 서비스

Event Sourcing

정의: 모든 상태 변화를 이벤트로 저장하여 애플리케이션 상태를 관리하는 기법

구성:

목적: 완전한 감사 추적, 시점 복구, 복잡한 비즈니스 로직 처리

실제 예시:

CQRS (Command Query Responsibility Segregation)

정의: 명령 (쓰기) 과 조회 (읽기) 모델을 분리하여 성능과 확장성을 향상시키는 기법

구성:

목적: 읽기/쓰기 워크로드 최적화, 독립적 확장, 복잡한 조회 지원

실제 예시:

1
2
3
주문 처리 시스템:
- Command: 주문 생성, 수정, 취소
- Query: 주문 조회, 통계, 리포트

Saga 패턴

정의: 분산 트랜잭션을 일련의 로컬 트랜잭션으로 분해하여 관리하는 기법

구성:

목적: 분산 시스템에서 데이터 일관성 보장

실제 예시:

1
2
3
온라인 쇼핑 주문 처리:
1. 재고 확인 → 2. 결제 처리 → 3. 배송 준비
실패 시: 보상 액션으로 이전 단계 롤백

배포 구조

Broker Topology (브로커 토폴로지)

정의: 중앙 중재자 없이 이벤트 프로세서들이 체인 형태로 연결된 구조

구성:

목적: 단순한 이벤트 플로우에서 높은 성능과 확장성 제공

실제 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Node.js를 사용한 브로커 토폴로지 구현
const EventEmitter = require('events');

class EventBroker extends EventEmitter {
    constructor() {
        super();
        this.processors = [];
    }
    
    registerProcessor(processor) {
        this.processors.push(processor);
    }
    
    publishEvent(eventType, eventData) {
        console.log(`Publishing event: ${eventType}`);
        this.emit(eventType, eventData);
    }
}

class OrderProcessor {
    constructor(broker) {
        this.broker = broker;
        this.broker.on('orderPlaced', this.handleOrderPlaced.bind(this));
    }
    
    handleOrderPlaced(orderData) {
        console.log(`Processing order: ${orderData.orderId}`);
        // 주문 처리 로직
        this.broker.publishEvent('orderProcessed', {
            orderId: orderData.orderId,
            status: 'processed'
        });
    }
}

// 사용 예시
const broker = new EventBroker();
const orderProcessor = new OrderProcessor(broker);

broker.publishEvent('orderPlaced', { orderId: '12345', amount: 100 });
Mediator Topology (중재자 토폴로지)

정의: 중앙 이벤트 중재자가 복잡한 워크플로우를 조정하는 구조

구성:

목적: 복잡한 비즈니스 프로세스의 오케스트레이션 및 오류 처리

실제 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 중재자 토폴로지 구현
class EventMediator {
    constructor() {
        this.workflows = new Map();
        this.processors = new Map();
    }
    
    registerWorkflow(eventType, steps) {
        this.workflows.set(eventType, steps);
    }
    
    registerProcessor(processorType, processor) {
        this.processors.set(processorType, processor);
    }
    
    async processEvent(eventType, eventData) {
        const workflow = this.workflows.get(eventType);
        if (!workflow) {
            throw new Error(`No workflow found for event type: ${eventType}`);
        }
        
        const context = { eventData, steps: [] };
        
        for (const step of workflow) {
            try {
                const processor = this.processors.get(step.processorType);
                const result = await processor.process(step, context);
                context.steps.push({ step: step.name, result, status: 'success' });
            } catch (error) {
                context.steps.push({ step: step.name, error, status: 'failed' });
                await this.handleFailure(workflow, context, step);
                break;
            }
        }
        
        return context;
    }
    
    async handleFailure(workflow, context, failedStep) {
        // 보상 트랜잭션 실행
        const completedSteps = context.steps.filter(s => s.status === 'success');
        for (const completedStep of completedSteps.reverse()) {
            try {
                const processor = this.processors.get(completedStep.processorType);
                await processor.compensate(completedStep, context);
            } catch (compensationError) {
                console.error('Compensation failed:', compensationError);
            }
        }
    }
}

장점

카테고리항목설명기술적 근거 / 실무 적합성
1. 확장성 (Scalability)수평 확장 용이성컴포넌트별 독립적인 확장이 가능하여, 컨슈머 인스턴스를 트래픽에 따라 유동적으로 조정 가능Kafka, RabbitMQ 등의 소비자 그룹 구조는 수평 확장을 기본적으로 지원
리소스 효율성이벤트 중심의 비동기 처리로, 불필요한 자원 낭비 없이 필요한 순간만 처리 가능Polling 제거 → 온디맨드 소비 기반으로 리소스 절약 가능
2. 실시간성 (Real-Time Responsiveness)즉시 반응성이벤트가 발생하는 즉시 처리 및 전파 가능 → 빠른 사용자 경험, 알림 및 반응 시스템 구축에 적합Kafka Streams, Flink, Lambda 등의 실시간 스트리밍 구조와 호환됨
낮은 지연 시간비동기 큐 기반 처리는 블로킹 없이 빠르게 응답 가능하여 전체 시스템 지연 시간 감소비동기 I/O 기반 이벤트 처리 구조 적용 가능
3. 느슨한 결합 (Loose Coupling)디커플링 구조Producer 와 Consumer 는 서로 직접 참조하지 않으며, 메시지 브로커를 통해 연결됨 → 독립성 증가API 직접 호출 대신 이벤트 메시지 전송 → 구조 분해 및 유연성 향상
독립 개발·배포 가능컴포넌트 간 의존성이 낮아 팀 간 병렬 개발, 독립적 배포가 가능마이크로서비스 분리 개발에 필수적인 전제 조건
4. 유연성 (Flexibility)서비스 변경 대응 용이새로운 소비자 추가 또는 기존 처리 로직 변경 시 기존 시스템에 영향 없이 확장 가능이벤트 중심 구조이므로 신규 구독자는 메시지를 구독만 하면 됨
이기종 통합 지원서로 다른 언어, 시스템, 프로토콜을 사용하는 시스템 간 통합이 쉬움Kafka, Pulsar 등의 다양한 클라이언트 언어 SDK 존재
5. 장애 격리 및 내결함성 (Resilience)장애 전파 차단Producer, Broker, Consumer 간 경계가 존재하여 하나의 장애가 전체 시스템에 영향 미치지 않음메시지 큐가 일시적인 장애를 버퍼링함으로써 회복 가능성 제공
복구 용이성이벤트 로그 또는 스토어를 통해 실패 후 재처리 가능Event Replay, DLQ, Retry Queue 등의 패턴 적용 가능
6. 통합 및 상호운용성 (Interoperability)시스템 통합 용이성다양한 언어와 플랫폼을 사용하는 시스템 간 메시지 기반 통합 가능AMQP, MQTT, Kafka 등 프로토콜 기반 통합 패턴 적용 가능
API 독립성직접적인 REST API 호출 없이도 서비스 간 데이터 연계 가능이벤트 기반 Publish/Subscribe 모델로 메시지 흐름 설계 가능
7. 비용 효율성 (Cost Efficiency)리소스 최적화Polling 제거 및 불필요한 컴퓨팅 자원 사용 감소로 클라우드 비용 절감 가능Serverless + 메시징 조합 시 비용 최적화 구조로 확장 가능
온디맨드 처리요청이 있을 때만 이벤트 처리 → 유휴 시간 동안 리소스 소비 없음클라우드 환경의 pay-as-you-go 모델에 적합

요약

핵심 특성설명 요약
확장성소비자 수평 확장, 트래픽 분산 처리 가능
실시간성이벤트 발생 즉시 반응, 빠른 사용자 피드백 제공
디커플링서비스 간 의존성 최소화 → 독립 개발, 유지보수 쉬움
장애 격리개별 장애 발생 시 전체 영향 차단, 메시지 기반 복구 지원
비용 절감Polling 제거, Serverless 와 결합 시 비용 효율 극대화
유연성 및 통합성시스템 간 통합 및 확장 용이, 플랫폼 언어 독립적 메시징 환경 구현 가능

단점과 문제점 그리고 해결방안

단점

항목설명주요 원인해결 방안
설계/운영 복잡성이벤트 흐름 관리, 라우팅, 스키마 진화 등으로 아키텍처가 복잡해짐분산 구조, 다양한 컴포넌트 (브로커, 스키마 등) 존재Event Portal, AsyncAPI 도입, IaC 관리 자동화
디버깅 및 추적 어려움이벤트 기반 비동기 처리로 흐름 파악 및 Root Cause 분석이 어려움비동기, 병렬, 분산 흐름OpenTelemetry 기반 분산 트레이싱, 중앙 로깅, Correlation ID 활용
데이터 일관성 문제Eventually Consistent 모델 특성상 상태 불일치 발생 가능비동기 처리, 병렬성Saga 패턴, Outbox 패턴, 보상 트랜잭션 등 도입
중복 이벤트 처리재시도 시 멱등성 보장 실패 시 중복 처리 가능성 있음네트워크 장애, 중복 송신Idempotency 설계, 고유 메시지 ID, Deduplication 로직 구현
이벤트 순서 보장 어려움분산 환경이나 병렬 처리 환경에서 순서 보장이 어려움파티셔닝, 비순차 소비Partition Key 기반 순서 보장, Single Writer Principle 적용
운영/학습 부담인프라 구성 요소가 다양하고 도입 시 조직적 전환이 필요브로커, DLQ, 모니터링 도구 등 학습 필요클라우드 매니지드 브로커 사용, 교육 및 점진적 전환
테스트 자동화 어려움이벤트 시퀀스, 상태 기반 테스트가 복잡하고 불확정성 존재비동기 흐름, 상태 비저장테스트 시뮬레이터, 시나리오 기반 테스트 자동화

문제점

항목설명원인영향탐지 및 진단예방 방법해결 기법/패턴
메시지 유실이벤트가 브로커/네트워크 장애로 인해 손실됨Ack 실패, 비영속 처리, 버퍼 초과데이터 유실, 상태 불일치브로커 메트릭, 실패 로그Durable Queue 구성, Ack 보장DLQ 구성, 재처리 스크립트, Kafka EOS 적용
메시지 중복 처리동일 이벤트가 여러 번 처리되어 중복된 결과 발생재시도 로직, 멱등성 미보장데이터 중복, 로직 오류메시지 ID 분석, 처리 이력 로깅Idempotent Consumer 설계, 키 기반 메시지 추적이벤트 ID 기반 deduplication, 트랜잭션 로그 검증
이벤트 순서 왜곡처리 순서가 어긋나 상태 불일치 혹은 비즈니스 로직 충돌 발생파티셔닝 미설정, 병렬 처리순서 의존 로직 오류Sequence ID, Timestamp 분석Partition Key 기반 순서 보장라미포트 타임스탬프, 정렬 큐, Single Writer 적용
스키마 불일치 오류Producer/Consumer 간 스키마 변경 불일치로 인해 처리 오류 발생Schema Evolution 미관리Consumer 오류, 메시지 실패Schema Validation 실패 로그 분석Schema Registry, 스키마 버전 전략 관리Forward/Backward Compatibility 설계, Contract Testing 적용
메시지 지연이벤트 처리 지연으로 인해 SLA 위반 또는 실시간 처리 실패브로커 과부하, 소비자 병목응답 지연, 경보 누락Consumer Lag, Latency 모니터링Auto-scaling, QoS 설정, 파티션 증설스로틀링, 우선순위 큐, Competing Consumer 패턴
오작동 루프잘못된 라우팅 혹은 이벤트 재발행 구조로 무한 이벤트 루프 발생이벤트 전파 경로 설계 미흡시스템 과부하, 서비스 불능TTL 설정, 순환 감지 트레이싱라우팅 정책 정형화, 이벤트 전파 제어TTL 메시지 만료 정책, DLQ 및 Dead Cycle 탐지 로직 도입
컨슈머 처리 지연소비자가 이벤트를 제때 처리하지 못하여 대기열이 증가처리 성능 부족, 리소스 미할당실시간성 저하, 시스템 백업Queue Depth, 처리율 모니터링컨슈머 수 확장, 워커 병렬화Backpressure 적용, Auto Scaling, SEDA 패턴

도전 과제

카테고리도전 과제원인영향탐지/진단 방법예방 전략해결 방법 및 기술
1. 확장성대규모 이벤트 처리이벤트 볼륨 급증시스템 과부하, 지연, 브로커 병목메트릭 기반 모니터링, 큐/파티션 지표 추적적절한 파티션/리플리카 수 조정, QoS 설정, Auto ScalingKafka 리밸런싱, Consumer Group 병렬 처리, 리소스 최적화
글로벌 멀티리전 메시징지리적 분산 환경레이턴시 증가, 데이터 동기화 지연지역별 전송 지표 추적, 토픽 복제 상태 분석Geo-partitioning, 트래픽 분산, Conflict 방지 설계Cross-region replication, CRDT 기반 데이터 구조 설계
2. 일관성 및 순서이벤트 순서 보장분산 환경의 비동기 처리, 다중 소비자 구조이벤트 순서 오류 → 로직 불일치이벤트 흐름 로깅, 이벤트 ID 추적키 기반 파티셔닝, 정렬된 큐 사용Partition key 기반 순서 유지, 벡터 클록 사용, Ordered Queue 설계
이벤트 중복 및 멱등성 처리재시도, 브로커 재전송 등중복 이벤트 처리로 상태 오염메시지 해시/ID 비교, 중복 수신율 모니터링이벤트 ID 기반 중복 방지 설계멱등성 보장 로직 구현, Outbox 패턴, DB 유니크 제약 적용
3. 트랜잭션 및 데이터 일관성분산 트랜잭션 처리서비스 간 연쇄적인 데이터 변경 요구부분 실패 발생 시 데이터 불일치로깅, 보상 트랜잭션 이벤트 추적도메인 분리, 트랜잭션 최소화Saga 패턴 (Orchestration/Choreography), 보상 트랜잭션 설계
메시지 유실 방지네트워크/브로커 장애, Ack 미수신데이터 손실 및 상태 오류 발생DLQ 누적률, 이벤트 누락률 모니터링ACK 보장, 저장 후 전송 방식 (Store & Forward)Retry + DLQ, At-least-once 처리, Event Store 활용
4. 스키마 및 메시지 진화스키마 변경/진화 관리이벤트 포맷 변경구독자 장애, 호환성 오류Schema Registry 로그, 메시지 파싱 실패 분석backward/forward 호환성 설계, 버전 명시화Avro/Protobuf + Schema Registry, CI/CD 에서 호환성 검증 자동화
이벤트 크기 관리과도한 페이로드 포함네트워크 대역폭 증가, 처리 지연메시지 크기 추적, 소비자 처리 시간 분석페이로드 최소화, 참조 설계참조 기반 메시지 구조 설계, Compression 적용
5. 가시성 및 모니터링비동기 흐름 추적메시지 기반 처리의 비직관성디버깅 어려움, SLA 위반 가능성분산 추적 도구, Correlation ID 활용로그 표준화, 이벤트 카탈로그화OpenTelemetry, Jaeger/Zipkin, Structured Logging 도입
운영 상태/지표 부족모니터링 체계 미흡병목/장애 발생 시 탐지 지연브로커 메트릭, 소비자 처리율, DLQ 비율 분석Prometheus + Grafana 대시보드 구성Kafka Exporter, SLA 기반 알람 구성
6. 보안 및 규제 대응이벤트 보안 및 인증민감 데이터 포함 메시지 처리데이터 유출, 규제 위반 위험접근 로그, 암호화 상태 모니터링TLS/mTLS 적용, OAuth2 인증, 최소 권한 설정RBAC, 토큰 기반 인증, 데이터 암호화/토큰화, 감사 로그 보관
멀티 테넌시 환경 지원테넌트 간 데이터 격리 필요데이터 누수, 성능 간섭테넌트별 이벤트 흐름 분리 분석네임스페이스 및 ACL 설계테넌트 기반 스트림 분리, 전용 리소스 할당
7. 이벤트 스토어 및 저장소이벤트 스토어 관리이벤트 지속성 유지로 인한 저장소 급증디스크 부족, 조회 성능 저하스토리지 사용률 추적, 이벤트 수명 분석TTL, 아카이빙 정책, 정기 스냅샷CQRS 기반 Read Store 설계, Event Archiving, Tiered Storage 활용

정리 요약

구분핵심 도전 요소대응 기술 및 전략 요약
확장성대량 이벤트 처리, 글로벌 메시징Partition 전략, Geo Replication, Auto Scaling
일관성순서 보장, 멱등성, 중복 처리Key 기반 처리, 이벤트 ID, 멱등성 핸들링
트랜잭션분산 트랜잭션, 메시지 유실 방지Saga 패턴, Outbox+CDC, DLQ/Retry
스키마 관리메시지 진화, 스키마 호환성 문제Schema Registry, 호환성 검증 파이프라인
운영 및 가시성비동기 추적, 지표 모니터링OpenTelemetry, Structured Logging, SLA 알람
보안 및 규제데이터 보호, 인증/인가, 테넌시 격리TLS, OAuth2, RBAC, 토큰화, 감사로그
스토리지 관리이벤트 저장소의 용량 증가, 비용/성능 이슈TTL, CQRS, Archiving, Read Optimization

분류 기준에 따른 종류 및 유형

분류 카테고리분류 기준주요 유형/옵션설명 / 특징
1. 통신 모델이벤트 전달 방식Pub/Sub, Point-to-Point, Event StreamingPub/Sub 은 1:N 브로드캐스트, P2P 는 1:1 큐 기반, Streaming 은 순차/재생 기반 전송
통신 방향단방향 (Fire-and-Forget), 양방향 (Request-Reply)WebSocket, gRPC 등 실시간 상호작용 구조에서도 이벤트 처리 가능
소비자 구조Fan-out, Competing Consumers, Consumer Group부하 분산/브로드캐스트 목적에 따라 소비자 모델 결정
2. 내구성 모델저장 방식Transient, Persisted메모리 기반 일시 저장 vs 디스크 기반 내구성 확보
이벤트 재생 가능 여부Volatile, Durable, Replayable일시 소비만 필요한 경우 vs 영속성과 재처리를 고려한 구조 설계
3. 브로커 구조배포 토폴로지Centralized, Decentralized (Mesh), HybridKafka 등 중앙 집중형 vs Solace/NATS 기반 분산형, 혼합 구조도 가능
아키텍처 토폴로지Broker Topology, Mediator Topology이벤트 중심 허브형 or 워크플로우 중심 중재자 구조 (오케스트레이션 기반)
4. 처리 모델이벤트 처리 방식Simple Event Processing, Complex Event Processing단순한 이벤트 반응 vs 패턴 매칭, 집계, 조건 기반 복합 분석 등
일관성 모델Strong Consistency, Eventual Consistency동기 기반 강일관성 vs 비동기 기반 지연된 일관성 (성능/확장성 고려)
5. 전달 보장메시지 배달 보장 수준At-most-once, At-least-once, Exactly-once이벤트 유실 허용/중복 허용/정확히 1 회 처리, 복잡도와 성능 트레이드오프
6. 이벤트 생성 방식생성 방식Event Sourcing, CDC(Change Data Capture), ECSTDB 변경 기반 이벤트 생성, 상태 변경 저장 기반 재구성, 이벤트 자체에 상태 포함
7. 이벤트 설계 전략상태 전달 방식Stateless, Event Carried State Transfer상태 없는 이벤트 vs 상태를 포함하는 이벤트 구조 (서비스 간 상태 동기화에 사용)
8. 거버넌스관리 체계Event Portal, Schema Registry, Versioning이벤트 계약, 스키마 진화, 이벤트 버전관리 등으로 안정적인 확장성과 통합 가능
9. 최적화 전략성능 패턴SEDA, Backpressure, Partitioning처리량 제어, 메시지 흐름 제어, 병렬성 향상을 위한 파티셔닝 등 적용
10. 패턴 통합아키텍처 통합 전략CQRS + EDA, DDD + EDA, Event Sourcing명령/조회 분리, 도메인 주도 설계와 결합된 이벤트 처리 구조 등

이벤트 유형 분류 전략

이벤트는 역할과 범위에 따라 Domain EventIntegration Event 로 나누며, 이는 EDA 설계에서 매우 중요하다.

항목Domain EventIntegration Event
정의도메인 내부의 의미 있는 상태 변화서비스 간 통합을 위한 이벤트
범위내부 서비스 또는 Bounded Context 범위외부 시스템 또는 다른 Bounded Context 에 전달되기 위한 이벤트
주 목적내부 비즈니스 로직 흐름 전파시스템 간 데이터 동기화 또는 통합 목적
변경 가능성상대적으로 자주 변경될 수 있음변경 시 계약 (contract) 변경에 민감. 변경 최소화 필요
구조 (Payload)도메인 모델 그대로 또는 추상화된 형태 가능API 레벨 DTO 와 유사한 명확한 필드 명세 필요
예시OrderCreatedEvent, InventoryReservedEventUserRegisteredIntegrationEvent, InvoiceGeneratedEvent

예시

Domain Event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "event_id": "abc-001",
  "event_type": "InventoryAdjusted",
  "occurred_at": "2025-07-17T14:00:00Z",
  "payload": {
    "sku": "ITEM-100",
    "change": -2,
    "reason": "OrderFulfilled"
  }
}
Integration Event
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "event_id": "def-101",
  "event_type": "UserRegistered",
  "occurred_at": "2025-07-17T14:00:00Z",
  "payload": {
    "user_id": "U123",
    "email": "user@example.com",
    "name": "Lee HyunYoon"
  }
}

Event-Driven Architecture vs. 동기적 REST API Architecture 비교

Event-Driven Architecture

동기적 REST API Architecture

비교 분석

항목이벤트 기반 아키텍처동기적 REST API 아키텍처
통신 방식비동기 메시지 기반동기 HTTP 요청 - 응답
결합도낮음 (느슨한 결합)높음 (직접 통신)
확장성뛰어남 (수평 확장 용이)중간 수준
성능유연한 처리, 처리량 높음실시간 반응 좋음, 부하 시 지연 발생
응답 지연이벤트 처리 후 응답 (대체로 비실시간)즉시 응답 (실시간성 제공)
에러 처리재시도/대기 큐 등 복잡한 접근 필요HTTP 상태 코드 기반, 단순
추적 및 디버깅분산 추적 필요, 까다로움직관적, 요청 로그 분석 용이
운영 복잡도브로커 구성·모니터링 필요비교적 단순
적합 시나리오실시간 이벤트 처리, 스트리밍, IoTCRUD, 인증, 서버 간 직접 통신
보안 처리메시지 인증·암호화 필요HTTPS, JWT 등 인증 제공
도입 난이도도전적, 인프라 비용 증가쉬움, 넓은 도구 지원

실무 사용 예시

도메인사용 목적사용 기술 / 패턴기대 효과대표 시나리오 / 설명
E-Commerce주문 처리, 결제, 재고 연동Kafka, Redis, Event Sourcing, CQRS, Saga Pattern실시간 처리, 데이터 일관성, 장애 격리, 확장성주문 생성 → 결제 처리 → 재고 차감 → 배송 연동 등 마이크로서비스 통합 시나리오
금융 / 트레이딩실시간 거래 처리, 사기 탐지, 로그 기록Kafka Streams, Flink, Pulsar, Event Sourcing밀리초 단위 처리, 사기 탐지, 추적 가능성, 확장성카드 결제 승인, 거래 이상 징후 탐지, 이벤트 기반 회계 처리 등
IoT / 제조센서 데이터 수집, 실시간 분석 및 경보MQTT, AWS IoT Core, Kinesis, InfluxDB, Node-RED대규모 비동기 수집, 서버리스 확장성, 지연 최소화온도/습도 센서 모니터링, 기계 상태 감시, 예측 정비 시스템 등
실시간 분석 / BI사용자 행동 분석, 추천, 대시보드 갱신Flink, Kafka, Dataflow, WebSocket실시간 인사이트 도출, 사용자 경험 향상클릭스트림 분석, 실시간 추천 시스템, 실시간 KPI 대시보드 등
모니터링 / 경보시스템 상태 모니터링 및 알림 전송Prometheus, Grafana, Kafka, PagerDuty, Redis Pub/Sub실시간 알림, 장애 대응 속도 향상, SLA 보장시스템 CPU 이상 감지 시 Slack 알림, DLQ 발생 알림 등
소셜 / 메시징실시간 피드, 채팅, 알림 전송Redis Streams, RabbitMQ, Kafka실시간 UX, 확장 가능한 메시징 처리채팅앱 메시지 송수신, 팔로우 피드 생성, 실시간 알림 전송 등
게임 / 미디어실시간 게임 상태, 리더보드, 콘텐츠 분석Pulsar, Kafka, Storm, MongoDB실시간 반영, 확장성, 분석 지연 최소화사용자 행동에 따른 리더보드 갱신, 실시간 방송 트래픽 분석 등
백오피스 시스템내부 이벤트 기반 로깅 및 감사 기록 관리Event Sourcing, Kafka이력 추적, 감사 대응, 변경사항 재생주문 상태 변경 로그 기록, 사용자 활동 이력 기록 등

활용 사례

사례 1: 전자상거래 주문 처리

Workflow:

flowchart TD
  User(사용자)
  Gateway(API 게이트웨이)
  OS(Order Service)
  Payment(Payment Service)
  Inventory(Inventory Service)
  Shipping(Shipping Service)
  Kafka(Kafka Broker)
  DB(Event Store)

  User --> Gateway --> OS
  OS --> Kafka["Topic: order.created"]
  Kafka --> Payment
  Kafka --> Inventory
  Payment --> Kafka["order.paid"]
  Inventory --> Kafka["inventory.reserved"]
  Kafka --> Shipping
  Kafka --> DB

역할 분석:

EDA 도입 전후 비교:

구분사전 (동기 중심)도입 후 (이벤트 중심)
구조주문⇢결제⇢재고⇢배송 → 단일 흐름서비스 간 독립적 이벤트 방식
장애한 서비스 장애 시 전체 중단장애 서비스만 격리, 복구 가능
확장성전체 시스템 확장 필요소비자만 개별 확장 가능
모니터링단일 흐름 트레이스 어려움이벤트 추적 가능, 모니터링 용이

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// orderService.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();

async function createOrder(order) {
  await producer.send({
    topic: 'order.created',
    messages: [{ key: order.id, value: JSON.stringify(order) }]
  });
  console.log(`Order event published: ${order.id}`);
}

module.exports = { createOrder };
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// paymentService.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'payment-service' });
const producer = kafka.producer();

async function start() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'order.created' });
  await producer.connect();

  await consumer.run({
    eachMessage: async ({ message }) => {
      const order = JSON.parse(message.value.toString());
      // 결제 처리 로직
      const paymentResult = { orderId: order.id, status: 'PAID' };
      await producer.send({
        topic: 'order.paid',
        messages: [{ key: order.id, value: JSON.stringify(paymentResult) }]
      });
      console.log(`Payment processed for order ${order.id}`);
    },
  });
}

start().catch(console.error);

사례 2: 주문 처리 시스템

시스템 구성:

graph LR
    A[주문 서비스] --> B[이벤트 브로커]
    B --> C[결제 서비스]
    B --> D[배송 서비스]
    B --> E[알림 서비스]

역할:

차이점:

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 이벤트 브로커 클래스
class EventBroker:
    def __init__(self):
        self.subscribers = {}

    def subscribe(self, event, callback):
        if event not in self.subscribers:
            self.subscribers[event] = []
        self.subscribers[event].append(callback)

    def publish(self, event, data):
        if event in self.subscribers:
            for callback in self.subscribers[event]:
                callback(data)

# 이벤트 발행자(주문 서비스)
def order_service(broker):
    broker.publish('order', {'order_id': 123, 'product': 'Laptop'})

# 이벤트 소비자(결제, 배송, 알림 서비스)
def payment_service(data):
    print('결제 처리:', data)
def delivery_service(data):
    print('배송 처리:', data)
def notification_service(data):
    print('알림 처리:', data)

# 브로커 생성
broker = EventBroker()
broker.subscribe('order', payment_service)
broker.subscribe('order', delivery_service)
broker.subscribe('order', notification_service)

# 이벤트 발행
order_service(broker)

사례 3: Kafka 기반 주문 처리 시스템

시스템 구성:

flowchart TB
  subgraph WebApp
    U[User 주문 요청] --> API[Order API]
  end
  API --> KafkaTopic[OrderCreated Topic]
  subgraph 미판매 서비스
    InventorySvc --> KafkaTopic
  end
  subgraph 소비자들
    InventorySvc[재고 검사] --> PaymentSvc[청구 서비스]
    NotificationSvc[알림 발송]
    AnalyticsSvc[로그 분석]
  end  
  KafkaTopic --> InventorySvc
  KafkaTopic --> PaymentSvc
  KafkaTopic --> NotificationSvc
  KafkaTopic --> AnalyticsSvc

Workflow:

  1. WebApp → API 서버가 OrderCreated 이벤트 발행
  2. Kafka 는 이벤트를 InventorySvc, PaymentSvc, NotificationSvc, AnalyticsSvc 등 컨슈머에게 전달
  3. 각 컨슈머는 독립적으로 처리 수행
  4. 장애 발생 시 DLQ 처리 및 자동 복구 가능

핵심 역할:

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# requirements: confluent-kafka==2.0.0
from confluent_kafka import Producer, Consumer, KafkaException
import json, time

# 1. 프로듀서 설정 및 이벤트 발행
producer_conf = {'bootstrap.servers': 'kafka:9092'}
producer = Producer(producer_conf)

def publish_order(order):
    producer.produce('OrderCreated', json.dumps(order).encode('utf-8'))
    producer.flush()

# 2. 컨슈머 설정 및 이벤트 처리
consumer_conf = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'inventory-service',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['OrderCreated'])

def process_inventory():
    while True:
        msg = consumer.poll(1.0)
        if msg is None: continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        event = json.loads(msg.value().decode('utf-8'))
        print(f"InventoryService 처리 중: 주문ID={event['order_id']}")
        # 실제 재고 검사/예외 시 DLQ logic
        time.sleep(0.1)

if __name__ == "__main__":
    import threading
    threading.Thread(target=process_inventory, daemon=True).start()
    # 예시 주문 발행
    for i in range(5):
        publish_order({'order_id': i, 'item': f'product_{i}', 'qty': i*2})
        time.sleep(0.5)

사례 4: Netflix 콘텐츠 추천 시스템

사용자의 시청 행동을 실시간으로 분석하여 개인화된 콘텐츠를 추천

시스템 구성:

graph TB
    subgraph "Event Producers"
        UP[User Playback]
        UR[User Rating]
        UV[User Views]
        US[User Search]
    end
    
    subgraph "Event Infrastructure"
        K[Apache Kafka]
        KS[Kafka Streams]
    end
    
    subgraph "Event Consumers"
        RS[Recommendation Service]
        AS[Analytics Service]
        PS[Personalization Service]
        CS[Content Service]
    end
    
    subgraph "Data Storage"
        ES[Event Store]
        RDB[Recommendation DB]
        UDB[User Profile DB]
    end
    
    UP --> K
    UR --> K
    UV --> K
    US --> K
    
    K --> KS
    KS --> RS
    KS --> AS
    KS --> PS
    KS --> CS
    
    RS --> RDB
    PS --> UDB
    AS --> ES

Workflow:

  1. 이벤트 생성: 사용자의 시청, 평가, 검색 활동이 실시간 이벤트로 생성
  2. 이벤트 스트리밍: Apache Kafka 를 통해 초당 수백만 개의 이벤트 처리
  3. 실시간 분석: Kafka Streams 를 활용한 실시간 패턴 분석
  4. 추천 업데이트: 머신러닝 모델을 통한 즉시 추천 알고리즘 업데이트
  5. 개인화 제공: 사용자별 맞춤 콘텐츠 실시간 제공

Event-Driven Architecture 의 역할:

EDA 유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
"""
Event-Driven Architecture 구현 예시
Netflix 스타일 콘텐츠 추천 시스템
"""

import asyncio
import json
from datetime import datetime
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
import uuid

# ============================================================================
# 1. 이벤트 정의 (Event Definitions)
# ============================================================================

@dataclass
class Event:
    """기본 이벤트 클래스"""
    event_id: str
    event_type: str
    timestamp: datetime
    user_id: str
    data: Dict
    
    def to_json(self) -> str:
        """이벤트를 JSON으로 직렬화"""
        event_dict = asdict(self)
        event_dict['timestamp'] = self.timestamp.isoformat()
        return json.dumps(event_dict)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'Event':
        """JSON에서 이벤트 역직렬화"""
        data = json.loads(json_str)
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        return cls(**data)

@dataclass
class UserViewEvent(Event):
    """사용자 시청 이벤트"""
    def __init__(self, user_id: str, content_id: str, duration: int, rating: Optional[float] = None):
        super().__init__(
            event_id=str(uuid.uuid4()),
            event_type="user_view",
            timestamp=datetime.now(),
            user_id=user_id,
            data={
                "content_id": content_id,
                "duration": duration,
                "rating": rating
            }
        )

@dataclass
class ContentRecommendationEvent(Event):
    """콘텐츠 추천 이벤트"""
    def __init__(self, user_id: str, recommendations: List[str]):
        super().__init__(
            event_id=str(uuid.uuid4()),
            event_type="content_recommendation",
            timestamp=datetime.now(),
            user_id=user_id,
            data={
                "recommendations": recommendations
            }
        )

# ============================================================================
# 2. 이벤트 브로커 (Event Broker)
# ============================================================================

class EventBroker:
    """이벤트 브로커 - 이벤트 라우팅 및 전달 담당"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self.event_store: List[Event] = []
    
    async def publish(self, event: Event) -> None:
        """이벤트 발행"""
        # 이벤트 저장 (Event Sourcing)
        self.event_store.append(event)
        
        # 구독자들에게 이벤트 전달
        if event.event_type in self.subscribers:
            tasks = []
            for handler in self.subscribers[event.event_type]:
                tasks.append(self._safe_handle(handler, event))
            
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)
    
    def subscribe(self, event_type: str, handler: Callable) -> None:
        """이벤트 구독"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    async def _safe_handle(self, handler: Callable, event: Event) -> None:
        """안전한 이벤트 핸들러 실행"""
        try:
            if asyncio.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)
        except Exception as e:
            print(f"이벤트 처리 오류: {e}")
    
    def get_events_by_user(self, user_id: str) -> List[Event]:
        """사용자별 이벤트 조회"""
        return [event for event in self.event_store if event.user_id == user_id]
    
    def get_events_by_type(self, event_type: str) -> List[Event]:
        """타입별 이벤트 조회"""
        return [event for event in self.event_store if event.event_type == event_type]

# ============================================================================
# 3. 이벤트 컨슈머 (Event Consumers)
# ============================================================================

class EventConsumer(ABC):
    """이벤트 컨슈머 기본 클래스"""
    
    def __init__(self, broker: EventBroker):
        self.broker = broker
        self.register_handlers()
    
    @abstractmethod
    def register_handlers(self) -> None:
        """이벤트 핸들러 등록"""
        pass

class RecommendationService(EventConsumer):
    """추천 서비스 - 사용자 시청 이벤트를 기반으로 추천 생성"""
    
    def __init__(self, broker: EventBroker):
        self.user_preferences: Dict[str, Dict] = {}
        self.content_database = {
            "movie_1": {"genre": "action", "rating": 4.5},
            "movie_2": {"genre": "comedy", "rating": 4.2},
            "movie_3": {"genre": "action", "rating": 4.7},
            "movie_4": {"genre": "drama", "rating": 4.3},
            "movie_5": {"genre": "comedy", "rating": 4.1}
        }
        super().__init__(broker)
    
    def register_handlers(self) -> None:
        """이벤트 핸들러 등록"""
        self.broker.subscribe("user_view", self.handle_user_view)
    
    async def handle_user_view(self, event: Event) -> None:
        """사용자 시청 이벤트 처리"""
        user_id = event.user_id
        content_id = event.data["content_id"]
        duration = event.data["duration"]
        rating = event.data.get("rating")
        
        # 사용자 선호도 업데이트
        self._update_user_preferences(user_id, content_id, duration, rating)
        
        # 새로운 추천 생성
        recommendations = self._generate_recommendations(user_id)
        
        # 추천 이벤트 발행
        recommendation_event = ContentRecommendationEvent(user_id, recommendations)
        await self.broker.publish(recommendation_event)
        
        print(f"[추천 서비스] 사용자 {user_id}에게 새로운 추천 생성: {recommendations}")
    
    def _update_user_preferences(self, user_id: str, content_id: str, duration: int, rating: Optional[float]) -> None:
        """사용자 선호도 업데이트"""
        if user_id not in self.user_preferences:
            self.user_preferences[user_id] = {"viewed_content": [], "preferred_genres": {}}
        
        # 시청한 콘텐츠 추가
        self.user_preferences[user_id]["viewed_content"].append({
            "content_id": content_id,
            "duration": duration,
            "rating": rating,
            "timestamp": datetime.now().isoformat()
        })
        
        # 장르 선호도 업데이트 (간단한 로직)
        if content_id in self.content_database:
            genre = self.content_database[content_id]["genre"]
            if genre not in self.user_preferences[user_id]["preferred_genres"]:
                self.user_preferences[user_id]["preferred_genres"][genre] = 0
            
            # 시청 시간과 평점을 기반으로 선호도 점수 계산
            preference_score = duration / 100  # 기본 점수
            if rating:
                preference_score *= rating / 5  # 평점 반영
            
            self.user_preferences[user_id]["preferred_genres"][genre] += preference_score
    
    def _generate_recommendations(self, user_id: str) -> List[str]:
        """추천 콘텐츠 생성"""
        if user_id not in self.user_preferences:
            return ["movie_1", "movie_2"]  # 기본 추천
        
        # 사용자가 본 콘텐츠
        viewed_content = {item["content_id"] for item in self.user_preferences[user_id]["viewed_content"]}
        
        # 선호 장르 기반 추천 (간단한 협업 필터링)
        preferred_genres = self.user_preferences[user_id]["preferred_genres"]
        
        recommendations = []
        for content_id, content_info in self.content_database.items():
            if content_id not in viewed_content:
                genre = content_info["genre"]
                if genre in preferred_genres and preferred_genres[genre] > 0:
                    recommendations.append(content_id)
        
        return recommendations[:3]  # 상위 3개 추천

class AnalyticsService(EventConsumer):
    """분석 서비스 - 이벤트 데이터를 수집하고 분석"""
    
    def __init__(self, broker: EventBroker):
        self.analytics_data: Dict[str, List] = {
            "user_engagement": [],
            "content_performance": [],
            "recommendation_effectiveness": []
        }
        super().__init__(broker)
    
    def register_handlers(self) -> None:
        """이벤트 핸들러 등록"""
        self.broker.subscribe("user_view", self.analyze_user_engagement)
        self.broker.subscribe("content_recommendation", self.analyze_recommendation)
    
    async def analyze_user_engagement(self, event: Event) -> None:
        """사용자 참여도 분석"""
        engagement_data = {
            "user_id": event.user_id,
            "content_id": event.data["content_id"],
            "duration": event.data["duration"],
            "timestamp": event.timestamp.isoformat()
        }
        self.analytics_data["user_engagement"].append(engagement_data)
        print(f"[분석 서비스] 사용자 참여도 데이터 수집: {engagement_data}")
    
    async def analyze_recommendation(self, event: Event) -> None:
        """추천 효과성 분석"""
        recommendation_data = {
            "user_id": event.user_id,
            "recommendations": event.data["recommendations"],
            "timestamp": event.timestamp.isoformat()
        }
        self.analytics_data["recommendation_effectiveness"].append(recommendation_data)
        print(f"[분석 서비스] 추천 데이터 수집: {recommendation_data}")
    
    def get_analytics_summary(self) -> Dict:
        """분석 요약 정보 반환"""
        return {
            "total_user_views": len(self.analytics_data["user_engagement"]),
            "total_recommendations": len(self.analytics_data["recommendation_effectiveness"]),
            "unique_users": len(set(item["user_id"] for item in self.analytics_data["user_engagement"]))
        }

class NotificationService(EventConsumer):
    """알림 서비스 - 사용자에게 알림 전송"""
    
    def register_handlers(self) -> None:
        """이벤트 핸들러 등록"""
        self.broker.subscribe("content_recommendation", self.send_recommendation_notification)
    
    async def send_recommendation_notification(self, event: Event) -> None:
        """추천 알림 전송"""
        user_id = event.user_id
        recommendations = event.data["recommendations"]
        
        # 실제로는 푸시 알림, 이메일 등을 전송
        print(f"[알림 서비스] 사용자 {user_id}에게 알림 전송: 새로운 추천 콘텐츠 {len(recommendations)}개")

# ============================================================================
# 4. 이벤트 프로듀서 (Event Producers)
# ============================================================================

class UserInteractionProducer:
    """사용자 상호작용 이벤트 프로듀서"""
    
    def __init__(self, broker: EventBroker):
        self.broker = broker
    
    async def user_watches_content(self, user_id: str, content_id: str, duration: int, rating: Optional[float] = None) -> None:
        """사용자 콘텐츠 시청 이벤트 생성"""
        event = UserViewEvent(user_id, content_id, duration, rating)
        await self.broker.publish(event)
        print(f"[이벤트 생성] 사용자 {user_id}{content_id} 시청 (시간: {duration}분)")

# ============================================================================
# 5. 메인 실행 코드
# ============================================================================

async def main():
    """메인 실행 함수 - Event-Driven Architecture 데모"""
    print("=== Event-Driven Architecture 데모 시작 ===\n")
    
    # 1. 이벤트 브로커 초기화
    broker = EventBroker()
    
    # 2. 이벤트 컨슈머 초기화 (서비스들)
    recommendation_service = RecommendationService(broker)
    analytics_service = AnalyticsService(broker)
    notification_service = NotificationService(broker)
    
    # 3. 이벤트 프로듀서 초기화
    user_producer = UserInteractionProducer(broker)
    
    print("모든 서비스가 초기화되었습니다.\n")
    
    # 4. 사용자 상호작용 시뮬레이션
    print("=== 사용자 상호작용 시뮬레이션 ===")
    
    # 사용자 1의 시청 패턴
    await user_producer.user_watches_content("user_1", "movie_1", 120, 4.5)  # 액션 영화
    await asyncio.sleep(0.1)  # 짧은 지연
    
    await user_producer.user_watches_content("user_1", "movie_3", 110, 4.7)  # 액션 영화
    await asyncio.sleep(0.1)
    
    # 사용자 2의 시청 패턴
    await user_producer.user_watches_content("user_2", "movie_2", 95, 4.0)   # 코미디 영화
    await asyncio.sleep(0.1)
    
    await user_producer.user_watches_content("user_2", "movie_5", 88, 3.8)   # 코미디 영화
    await asyncio.sleep(0.1)
    
    # 사용자 1이 추가로 시청
    await user_producer.user_watches_content("user_1", "movie_4", 130, 4.2)  # 드라마 영화
    await asyncio.sleep(0.1)
    
    print("\n=== 분석 결과 ===")
    summary = analytics_service.get_analytics_summary()
    print(f"총 시청 횟수: {summary['total_user_views']}")
    print(f"총 추천 생성 수: {summary['total_recommendations']}")
    print(f"고유 사용자 수: {summary['unique_users']}")
    
    print("\n=== 사용자별 추천 결과 확인 ===")
    # 사용자별 추천 결과 확인
    for user_id in ["user_1", "user_2"]:
        recommendations = recommendation_service._generate_recommendations(user_id)
        print(f"사용자 {user_id} 최종 추천: {recommendations}")
        
        # 사용자 선호도 확인
        if user_id in recommendation_service.user_preferences:
            preferred_genres = recommendation_service.user_preferences[user_id]["preferred_genres"]
            print(f"사용자 {user_id} 선호 장르: {preferred_genres}")
    
    print("\n=== 이벤트 저장소 확인 ===")
    print(f"총 저장된 이벤트 수: {len(broker.event_store)}")
    
    # 이벤트 타입별 통계
    event_types = {}
    for event in broker.event_store:
        event_type = event.event_type
        event_types[event_type] = event_types.get(event_type, 0) + 1
    
    print("이벤트 타입별 통계:")
    for event_type, count in event_types.items():
        print(f"  {event_type}: {count}개")
    
    print("\n=== Event-Driven Architecture 데모 완료 ===")

# 실행
if __name__ == "__main__":
    asyncio.run(main())

사례 5: 전자상거래 주문 시스템 실시간 처리

시스템 구성 요소:

graph TD
    UI[Frontend SPA] --> API[Order API]
    API --> SVC[OrderService]
    SVC --> DB[(Order DB)]
    SVC --> OUTBOX[(Outbox Table)]
    OUTBOX --> CONNECT[Kafka Connect CDC]
    CONNECT --> KAFKA[(Kafka)]
    KAFKA --> INV[Inventory Service]
    KAFKA --> NOTIF[Notification Service]

Workflow:

  1. 사용자 주문 → DB 저장 + Outbox insert
  2. Kafka Connect 가 Outbox 변경 감지 → Kafka 로 publish
  3. InventoryService 와 NotificationService 가 이벤트 수신 후 처리

차이점 비교:

항목전통적 방식EDA 방식
처리 흐름동기 호출 → 트랜잭션 결합비동기 이벤트 흐름
확장성단일 트랜잭션 내 처리서비스별 독립 확장
장애 복원하나 실패 시 전체 롤백실패한 서비스만 재처리

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 이벤트 발행 (Producer)
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

event = {
    "order_id": "ORD-123",
    "status": "ORDER_CREATED",
    "timestamp": "2025-06-14T12:00:00Z"
}
producer.send('order-events', value=event)
producer.flush()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 이벤트 수신 (Consumer)
from kafka import KafkaConsumer

consumer = KafkaConsumer('order-events',
                         bootstrap_servers='localhost:9092',
                         auto_offset_reset='earliest',
                         group_id='inventory-group',
                         value_deserializer=lambda m: json.loads(m.decode('utf-8')))

for msg in consumer:
    event = msg.value
    print(f"Inventory 처리 중: {event['order_id']} → 상태: {event['status']}")

사례 6: Netflix 의 콘텐츠 파이낸스 시스템

시스템 구성:

graph TB
    subgraph "Content Finance System"
        A[Content Planning Service] -->|Budget Events| B[Event Bus - Kafka]
        C[Spending Tracking Service] -->|Expense Events| B
        D[Catalog Service] -->|Content Events| B
        
        B -->|Financial Events| E[Finance Analytics Service]
        B -->|Budget Events| F[Budget Management Service]
        B -->|Report Events| G[Reporting Service]
        
        E --> H[Financial Dashboard]
        F --> I[Budget Alerts]
        G --> J[Executive Reports]
    end
    
    subgraph "External Systems"
        K[Production Systems] -->|Production Events| B
        L[Content Acquisition] -->|Acquisition Events| B
    end

Workflow:

  1. 콘텐츠 기획 단계
    • 콘텐츠 기획 서비스에서 예산 이벤트 발행
    • 예산 관리 서비스에서 예산 할당 처리
  2. 제작 진행 단계
    • 제작 시스템에서 지출 이벤트 실시간 발행
    • 지출 추적 서비스에서 예산 대비 진행률 계산
  3. 분석 및 보고 단계
    • 금융 분석 서비스에서 트렌드 분석
    • 보고 서비스에서 경영진 대시보드 업데이트

EDA 의 역할:

기존 시스템과의 차이점:

구분기존 배치 시스템EDA 기반 시스템
데이터 갱신일일 배치 처리실시간 이벤트 처리
확장성수직 확장 제한수평 확장 용이
장애 복구전체 시스템 재시작부분 서비스 복구
개발 속도통합 배포 필요독립적 서비스 배포

구현 예시

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// 이벤트 정의
class BudgetAllocatedEvent {
    constructor(projectId, amount, department) {
        this.eventId = this.generateId();
        this.eventType = 'BudgetAllocated';
        this.timestamp = new Date().toISOString();
        this.data = { projectId, amount, department };
    }
    
    generateId() {
        return 'evt_' + Math.random().toString(36).substr(2, 9);
    }
}

class ExpenseRecordedEvent {
    constructor(projectId, amount, category, vendor) {
        this.eventId = this.generateId();
        this.eventType = 'ExpenseRecorded';
        this.timestamp = new Date().toISOString();
        this.data = { projectId, amount, category, vendor };
    }
    
    generateId() {
        return 'evt_' + Math.random().toString(36).substr(2, 9);
    }
}

// 이벤트 브로커
class EventBroker {
    constructor() {
        this.subscribers = new Map();
        this.eventStore = [];
    }
    
    subscribe(eventType, handler) {
        if (!this.subscribers.has(eventType)) {
            this.subscribers.set(eventType, []);
        }
        this.subscribers.get(eventType).push(handler);
    }
    
    async publish(event) {
        // 이벤트 저장 (Event Sourcing)
        this.eventStore.push(event);
        
        // 구독자들에게 이벤트 전달
        const handlers = this.subscribers.get(event.eventType) || [];
        
        for (const handler of handlers) {
            try {
                await handler(event);
            } catch (error) {
                console.error(`Error processing event ${event.eventId}:`, error);
                // 에러 처리 로직 (Dead Letter Queue 등)
                await this.handleEventProcessingError(event, error);
            }
        }
    }
    
    async handleEventProcessingError(event, error) {
        // 실패한 이벤트를 별도 큐로 전송
        console.log(`Moving event ${event.eventId} to dead letter queue`);
    }
}

// 예산 관리 서비스
class BudgetManagementService {
    constructor(eventBroker) {
        this.eventBroker = eventBroker;
        this.budgets = new Map(); // projectId -> budget info
        
        // 이벤트 구독
        this.eventBroker.subscribe('BudgetAllocated', this.handleBudgetAllocated.bind(this));
        this.eventBroker.subscribe('ExpenseRecorded', this.handleExpenseRecorded.bind(this));
    }
    
    async allocateBudget(projectId, amount, department) {
        // 예산 할당 비즈니스 로직
        this.budgets.set(projectId, {
            totalBudget: amount,
            spentAmount: 0,
            department: department,
            createdAt: new Date()
        });
        
        // 이벤트 발행
        const event = new BudgetAllocatedEvent(projectId, amount, department);
        await this.eventBroker.publish(event);
        
        console.log(`Budget allocated: ${amount} for project ${projectId}`);
    }
    
    async handleBudgetAllocated(event) {
        // 예산 할당 이벤트 처리 (이미 allocateBudget에서 처리됨)
        console.log(`Budget allocation confirmed for project ${event.data.projectId}`);
    }
    
    async handleExpenseRecorded(event) {
        const { projectId, amount } = event.data;
        const budget = this.budgets.get(projectId);
        
        if (budget) {
            budget.spentAmount += amount;
            
            // 예산 초과 체크
            if (budget.spentAmount > budget.totalBudget) {
                await this.publishBudgetExceededAlert(projectId, budget);
            }
            
            console.log(`Updated spent amount for project ${projectId}: ${budget.spentAmount}`);
        }
    }
    
    async publishBudgetExceededAlert(projectId, budget) {
        const alertEvent = {
            eventId: 'alert_' + Math.random().toString(36).substr(2, 9),
            eventType: 'BudgetExceeded',
            timestamp: new Date().toISOString(),
            data: {
                projectId,
                totalBudget: budget.totalBudget,
                spentAmount: budget.spentAmount,
                overage: budget.spentAmount - budget.totalBudget
            }
        };
        
        await this.eventBroker.publish(alertEvent);
    }
}

// 지출 추적 서비스
class ExpenseTrackingService {
    constructor(eventBroker) {
        this.eventBroker = eventBroker;
        this.expenses = new Map(); // projectId -> expenses[]
    }
    
    async recordExpense(projectId, amount, category, vendor) {
        // 지출 기록 비즈니스 로직
        if (!this.expenses.has(projectId)) {
            this.expenses.set(projectId, []);
        }
        
        const expense = {
            id: 'exp_' + Math.random().toString(36).substr(2, 9),
            projectId,
            amount,
            category,
            vendor,
            recordedAt: new Date()
        };
        
        this.expenses.get(projectId).push(expense);
        
        // 이벤트 발행
        const event = new ExpenseRecordedEvent(projectId, amount, category, vendor);
        await this.eventBroker.publish(event);
        
        console.log(`Expense recorded: ${amount} for project ${projectId}`);
    }
}

// 분석 서비스
class AnalyticsService {
    constructor(eventBroker) {
        this.eventBroker = eventBroker;
        this.projectMetrics = new Map();
        
        // 이벤트 구독
        this.eventBroker.subscribe('BudgetAllocated', this.updateMetrics.bind(this));
        this.eventBroker.subscribe('ExpenseRecorded', this.updateMetrics.bind(this));
        this.eventBroker.subscribe('BudgetExceeded', this.handleBudgetExceeded.bind(this));
    }
    
    async updateMetrics(event) {
        const projectId = event.data.projectId;
        
        if (!this.projectMetrics.has(projectId)) {
            this.projectMetrics.set(projectId, {
                totalEvents: 0,
                budgetEvents: 0,
                expenseEvents: 0,
                lastUpdated: new Date()
            });
        }
        
        const metrics = this.projectMetrics.get(projectId);
        metrics.totalEvents++;
        
        if (event.eventType === 'BudgetAllocated') {
            metrics.budgetEvents++;
        } else if (event.eventType === 'ExpenseRecorded') {
            metrics.expenseEvents++;
        }
        
        metrics.lastUpdated = new Date();
        
        console.log(`Analytics updated for project ${projectId}:`, metrics);
    }
    
    async handleBudgetExceeded(event) {
        console.log(`ALERT: Budget exceeded for project ${event.data.projectId}`);
        // 알림 서비스로 이벤트 전달, 대시보드 업데이트 등
    }
    
    getProjectMetrics(projectId) {
        return this.projectMetrics.get(projectId);
    }
}

// 시스템 사용 예시
async function demonstrateNetflixFinanceSystem() {
    // 이벤트 브로커 초기화
    const eventBroker = new EventBroker();
    
    // 서비스들 초기화
    const budgetService = new BudgetManagementService(eventBroker);
    const expenseService = new ExpenseTrackingService(eventBroker);
    const analyticsService = new AnalyticsService(eventBroker);
    
    // 시나리오 실행
    console.log('=== Netflix Content Finance System Demo ===\n');
    
    // 1. 새로운 콘텐츠 프로젝트 예산 할당
    await budgetService.allocateBudget('project_001', 1000000, 'Original Content');
    
    // 잠시 대기 (이벤트 처리 시간)
    await new Promise(resolve => setTimeout(resolve, 100));
    
    // 2. 제작 비용 기록
    await expenseService.recordExpense('project_001', 250000, 'Production', 'Studio A');
    await expenseService.recordExpense('project_001', 150000, 'Post-Production', 'Edit House B');
    await expenseService.recordExpense('project_001', 300000, 'Marketing', 'Agency C');
    
    await new Promise(resolve => setTimeout(resolve, 100));
    
    // 3. 예산 초과 상황 시뮬레이션
    await expenseService.recordExpense('project_001', 400000, 'Additional Scenes', 'Studio A');
    
    await new Promise(resolve => setTimeout(resolve, 100));
    
    // 4. 분석 결과 확인
    console.log('\n=== Final Analytics ===');
    console.log(analyticsService.getProjectMetrics('project_001'));
    
    // 5. 이벤트 저장소 확인 (Event Sourcing)
    console.log('\n=== Event Store (First 5 events) ===');
    eventBroker.eventStore.slice(0, 5).forEach((event, index) => {
        console.log(`${index + 1}. ${event.eventType} - ${event.timestamp}`);
    });
}

// 데모 실행
demonstrateNetflixFinanceSystem().catch(console.error);

사례 7: Netflix 의 금융 데이터 처리

시스템 구성:

flowchart TD
    A[Finance Data Generator] --> B[Apache Kafka]
    B --> C[Spring Boot Microservice 1]
    B --> D[Spring Boot Microservice 2]
    B --> E[Spring Boot Microservice 3]

워크플로우:

EDA 의 역할:

EDA 유무 차이:

실무에서 효과적으로 적용하기 위한 고려사항 및 주의할 점

카테고리고려 항목설명권장사항
1. 이벤트 스키마 및 계약메시지 스키마 안정성스키마 변경 시 하위 호환성과 소비자 영향 발생Schema Registry(Avro/Protobuf) + backward/forward 호환 설계 적용
스키마 진화 관리점진적인 스키마 변경 및 버전 관리 필요CI/CD 연동 자동 검증, 버전 정책 수립, AsyncAPI 문서화 유지
메시지 크기/페이로드 설계과도한 메시지 크기는 네트워크 부하 증가 및 처리 지연 유발페이로드 최소화, 참조 기반 메시지 설계, 이벤트 계약 경량화
2. 장애 처리 및 복구DLQ(Dead Letter Queue) 설계반복 실패 메시지 축적 방지 및 재시도 정책 필요TTL, Retry Count, 알림 포함한 DLQ 구성
재처리 및 이벤트 재생 전략순서/중복/불일치 문제 발생 가능이벤트 ID 기반 멱등성, Offset 관리, Replay 시점 분기점 설정 (snapshot) 사용
에러 처리 및 재시도 정책처리 실패에 대한 유연한 대응 구조 필요Retry + Backoff 전략, Circuit Breaker 적용
3. 확장성 및 성능 최적화스케일링 전략트래픽 증가에 대응한 소비자 그룹의 확장성 확보 필요Consumer Group 수평 확장, Kafka Rebalancing 전략 설계
백프레셔 및 큐 관리소비자 처리 속도 부족 시 큐 증가 및 장애 위험큐 깊이 기반 처리 속도 조정, Adaptive Throttling, 모니터링 기반 Auto Scaling
파티션 및 데이터 할당 전략파티션 불균형으로 인한 핫 파티션 발생 위험키 기반 파티셔닝, 파티션 수 조정, 리밸런싱 도구 사용
4. 일관성 및 트랜잭션 보장데이터 일관성 보장중복, 유실, 순서 오류 등으로 인한 데이터 신뢰성 저하 가능멱등성 처리, Outbox + CDC 패턴 적용
이벤트 순서 보장 (Ordering)순서가 중요한 이벤트는 파티셔닝 및 메시지 정렬 필요동일 키 기반 파티션 전략 유지, 메시지 ordering 확인 로직 추가
5. 모니터링 및 가시성 확보분산 추적 및 트레이싱비동기 메시지 흐름 추적 어려움Correlation ID, OpenTelemetry, Jaeger, Zipkin 연동
모니터링 및 알람장애 감지 및 이벤트 지연 추적 어려움Prometheus + Grafana, Kafka Exporter 기반 메트릭 수집
이벤트 흐름 로깅 및 분석운영 시 흐름 추적 및 장애 디버깅 필수중앙 집중 로그 관리 (EFK/ELK), 표준 로깅 포맷 적용
6. 보안 및 접근 제어이벤트 암호화 및 인증이벤트 위조/탈취/도청 가능성 존재TLS/mTLS 적용, OAuth2/JWT, 메시지 무결성 검증
권한 관리 및 접근 제어구독자/발행자 간 접근 정책 미비 시 보안 취약RBAC/IAM 정책, ACL 기반 구독 제어
민감 데이터 보호개인정보 또는 중요 정보 포함 이벤트 전송 시 보안 요구페이로드 내 암호화, 토큰화, 감사 로그 관리
7. 테스트 및 운영 전략테스트 전략비동기 흐름 테스트 어려움Mock Broker, 통합 E2E 테스트 환경 구축
배포 전략메시지 변경 시 중단 없는 배포 전략 필요Rolling Update, 브로커/서비스 독립 배포 설계
비용 효율화 전략저장소 및 리소스 사용 증가 → 운영비용 상승Tiered Storage, 데이터 압축, Retention 정책 설정
조직 및 협업 표준화이벤트 혼재 및 중복 발생 가능이벤트 카탈로그, 이벤트 네이밍/스키마 표준 수립

핵심 요약

EDA 기반 SaaS 설계 전략

SaaS 와 EDA 결합 시 설계 목적
설계 아키텍처
graph TD
    CLIENT[Multi-Tenant UI/API]
    CLIENT --> SVC["Domain Service (Order/Invoice/Auth)"]
    SVC --> OUTBOX[(Outbox Table)]
    OUTBOX --> CONNECT[CDC/Kafka Connect]
    CONNECT --> BROKER[Kafka Broker]
    BROKER --> AUDIT[Audit Service]
    BROKER --> TENANT1[Tenant-1 Invoice Service]
    BROKER --> TENANT2[Tenant-2 Invoice Service]
    BROKER --> ML[Anomaly Detection ML Model]
SaaS 설계 핵심 전략
전략 항목설명적용 예시
멀티테넌시 처리이벤트에 tenant_id 필수 포함Kafka Topic 내 Header 또는 Payload
도메인 분리기능 단위 마이크로서비스 구성주문, 결제, 알림을 분리
Outbox Pattern데이터 정합성과 메시지 일관성 확보DB → Outbox Table → Kafka
동적 Subscription테넌트에 따라 동적 Consumer 구성Runtime Topic Routing (Spring Cloud Stream 등)
확장성 기반 이벤트 필터링토픽 필터, Consumer 분리Kafka Streams, Kafka Router 활용

테스트 전략

테스트 유형테스트 대상목적/전략주요 도구 및 기법
유닛 테스트이벤트 핸들러 로직단일 이벤트 처리 함수의 정확성 검증pytest, unittest, MockKafkaConsumer, mock 객체 주입
통합 테스트Producer ↔ Broker ↔ Consumer 전체 흐름실제 메시지 브로커 포함, end-to-end 이벤트 흐름 검증Testcontainers, Embedded Kafka, Spring Test, FastAPI TestClient
계약 테스트Producer ↔ Consumer 간 메시지 계약메시지 스키마 호환성 및 계약 위반 방지Pact, AsyncAPI, Kafka Schema Registry
회귀 테스트과거 이벤트의 재처리 검증기존 이벤트 처리 로직에 대한 후행 이슈 방지Event Replay, Kafka Replay, Event Store Replay
성능 테스트Broker 처리량, Consumer 처리 속도처리량/지연/스루풋 측정, 병목 파악k6, Locust, kafka-producer-perf-test.sh, Gatling
Chaos 테스트브로커 및 컨슈머 장애 대응 시나리오장애 발생 시 복원력과 격리성 확인Gremlin, Chaos Monkey, 네트워크 차단 시뮬레이션 등
테스트 전략 적용 포인트
항목전략/설명권장 사항
스키마 안정성Producer/Consumer 간 메시지 호환성 테스트Schema Registry, Mock Registry, 계약 테스트 통합 활용
이벤트 순서 보장파티션 키 기준 순서 테스트 (FIFO 보장 여부)동일 partition key 로 연속 메시지 발행
동시성/경쟁 조건 탐지Consumer 그룹 내 병렬 처리 시 Race Condition 검증다중 컨슈머 환경 구성, 처리 순서 로깅 후 분석
에러 및 복원력컨슈머 실패 시 DLQ 이동, 재시도 전략 테스트Retry Handler, Dead Letter Queue, Circuit Breaker 사용
성능 임계 테스트고트래픽 환경에서 처리량 한계 및 메시지 지연 시간 측정대용량 이벤트 발행, 메시지 Queue 적체 시나리오 구성
유닛 테스트 코드 검토 및 개선 (Python 기준)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# --- 이벤트 핸들러 ---
def handle_order_created(event):
    if not isinstance(event, dict) or "status" not in event:
        raise TypeError("Invalid event format")
    if event["status"] != "ORDER_CREATED":
        raise ValueError("Invalid event status")
    
    # 실제 비즈니스 로직 예: 재고 차감
    return f"Handled order {event['order_id']}"

# --- 테스트 코드 ---
def test_handle_order_created_valid():
    event = {"order_id": "ORD-001", "status": "ORDER_CREATED"}
    result = handle_order_created(event)
    assert result == "Handled order ORD-001"

def test_handle_order_created_invalid_status():
    event = {"order_id": "ORD-002", "status": "CANCELLED"}
    try:
        handle_order_created(event)
    except ValueError as e:
        assert str(e) == "Invalid event status"

의의:

  • 이벤트 유효성 검증 추가
  • 예외 케이스 테스트 포함
  • 단위 테스트의 독립성과 재현성을 강화

퍼블리싱 전달 보증 수준 (Delivery Guarantees)

각 메시징 시스템은 서로 다른 전달 보증 수준을 제공하며, 보증 수준 설계 시 중요한 고려사항이다:

보증 수준정의보장 조건 및 전제실현 메커니즘대표 활용 시나리오예시 기술
At-most-once최대 한 번 전달 (손실 허용)네트워크 오류, Consumer 다운 시 손실 발생 가능- 메시지 전송 후 즉시 삭제
- Ack 없이 전송
- Retry 없음
- 로그 전송
- 실시간 모니터링
- Push 알림 등 손실 허용 가능한 상황
UDP 기반 전송
Kafka (acks=0)
At-least-once최소 한 번 이상 전달 (중복 허용)- 메시지 손실 없이 도착 보장
- 중복 발생 가능성 존재
- 메시지 재시도 (Retry)
- Ack 기반 수신 확인
- 브로커 메시지 저장 유지
- 결제 처리
- 주문 시스템
- 데이터 적재 (중복 필터링 가능 시)
Kafka (acks=1), RabbitMQ, SQS
Exactly-once정확히 한 번 전달 보장- 메시지 중복도, 손실도 모두 방지
- 시스템 전반의 idempotency 필요
- Idempotent Producer 설정
- Transactional Consumer
- 메시지 상태 관리, 중복 제거 로직 필요
- 금융 이체
- 분산 트랜잭션
- 이벤트 소싱 기반 영속성 처리
Kafka EOS
Kafka Streams
SQS FIFO + deduplication ID
핵심 비교 요약
항목At-most-onceAt-least-onceExactly-once
신뢰도낮음 (손실 가능)중간 (중복 가능)높음 (중복/손실 모두 방지)
성능매우 빠름중간상대적으로 낮음 (복잡한 처리 필요)
복잡도낮음중간높음
실현 비용거의 없음재시도, 저장소 필요트랜잭션 로그, 상태 관리 등 부가 시스템 요구
적합 용도비핵심 정보, 손실 허용핵심 정보, 중복 허용비즈니스 핵심 정보, 중복/손실 모두 금지
Kafka - 전달 보증 구현 방법
보증 수준설정 항목설정 방법 및 설명
At-most-onceacks=0프로듀서가 메시지 전송 후 Ack 기다리지 않고 즉시 성공 처리 (손실 가능)
retries=0재시도 없이 실패 시 메시지 손실 허용
At-least-onceacks=1 또는 acks=all브로커가 수신 확인을 해야 전송 성공 처리 (acks=all 은 더 안정적)
enable.auto.commit=false컨슈머가 명시적으로 오프셋 커밋 (처리 실패 시 재시도 가능)
retries > 0메시지 전송 실패 시 자동 재시도
Exactly-onceenable.idempotence=true동일 메시지 재전송 시 중복 방지
transactional.id트랜잭션 기반으로 프로듀서 구성
컨슈머 측 read_committed 설정커밋된 메시지만 읽도록 설정
RabbitMQ - 전달 보증 구현 방법
보증 수준설정 항목설정 방법 및 설명
At-most-onceauto-ack=true컨슈머가 수신 즉시 Ack 전송 → 처리 실패 시 재시도 불가 (손실 허용)
durable=false, persistent=false큐 및 메시지를 메모리 기반으로 유지
At-least-onceauto-ack=false컨슈머가 처리 완료 후 수동 Ack 전송
durable=true, persistent=true큐와 메시지를 디스크에 저장
prefetch > 0QoS 제어로 병렬 처리 제어
Exactly-once명시적 지원 없음 (RabbitMQ 자체 한계)대신 중복 허용 후 Idempotent Consumer 로 구현 필요
message deduplication 로직 수동 구현메시지 ID 기반 중복 방지 (Redis/DB 활용 등)

컴플라이언스 보장 (Compliance Assurance)

카테고리항목설명권장 전략 및 도구 예시
1. 데이터 감사 및 추적성이벤트 이력 관리 (Audit Trail)모든 이벤트의 생성, 발행, 소비 내역을 시간 순으로 기록하여 변경 추적 및 감사 대응 가능Kafka Topic Logging, Event Store, Kafka Connect + Sink Connectors
감사 로그 및 액세스 추적누가 언제 어떤 이벤트를 처리했는지에 대한 상세 로그 확보SIEM 연동 (예: ELK, Splunk), Prometheus Alert + Loki
2. 개인정보 및 민감정보 보호PII/PHI 보호개인식별정보, 의료정보 등을 이벤트 내에서 암호화 또는 토큰화하여 보호AES256 암호화, Vault, Macaroon Token, Field-Level Encryption
스키마 기반 보호스키마를 활용하여 민감 필드에 대한 정책 기반 마스킹/제한Confluent Schema Registry + Field Rule 정책
3. 인증 및 접근 통제인증/인가 및 권한 관리이벤트 브로커, Consumer, Producer 간의 접근 권한을 엄격히 관리Kafka ACL, OAuth2.0, JWT, IAM 정책 기반 제어
멀티 테넌시 지원사용자/조직 별 격리된 리소스와 권한 정책 제공Topic per Tenant, Resource Quota, Namespace 기반 격리
4. 데이터 보존 및 삭제 정책이벤트 보존 정책 설정법적/규제 요건에 따라 보존 주기 관리 (예: 1 년, 7 년)Kafka Retention 설정, Tiered Storage, TTL 기반 Cleanup
규제 기반 데이터 삭제데이터 삭제 요청 (예: GDPR 의 Right to be Forgotten) 에 따른 제거 절차 구현데이터 제거 워크플로우 정의, Kafka Tombstone 메시지 활용
5. 지역 및 규제 준수데이터 주권 및 지역 제한특정 지역 내 데이터 저장 및 전송 요구 (GDPR, HIPAA, 등)Region 기반 Partition 분리, Cross-Region 암호화 Replication
국제 규제 대응ISO27001, SOC2, HIPAA, GDPR 등 글로벌 기준에 대한 시스템 대응감사 프로세스 문서화, CSP 규제 대응 체크리스트, 데이터 접근 로깅

최적화하기 위한 고려사항 및 주의할 점

카테고리최적화 항목설명권장사항
성능배치 처리 (Batching)처리량을 높이고 네트워크 호출을 줄이기 위해 다수의 메시지를 일괄 처리적응형 배치 크기 조정, 처리 지연 한계 설정
압축 (Compression)전송 메시지의 크기를 줄여 네트워크 대역폭과 I/O 부담을 줄임Snappy, LZ4, ZSTD 등 경량 압축 알고리즘
캐시 (Caching)반복적인 데이터 조회를 줄이고 처리 속도를 개선CQRS read-side 에 Redis 등 캐시 계층 도입
메시지 필터링불필요한 메시지 처리 방지로 소비자 부하 경감브로커 수준 필터링 또는 Subscription 조건 지정
브로커 튜닝큐 깊이, I/O 성능, 스레드 풀 설정 등을 통한 메시지 처리 최적화Kafka 설정 조정 (num.network.threads 등)
확장성파티셔닝 전략 (Partitioning)메시지를 키 기반으로 파티셔닝하여 병렬 처리 성능과 확장성 확보비즈니스 키 기반 파티션 키 설계, 핫스팟 방지
Consumer Group 병렬 처리Consumer 를 여러 인스턴스로 병렬화하여 메시지 처리 속도 증가Consumer Group 구성, Partition 수와 동기화
Auto Scaling트래픽 변화에 따라 자동으로 인스턴스를 확장 또는 축소큐 깊이, CPU, 메시지 처리율 기반 스케일링 트리거 설정
로드 밸런싱다양한 인스턴스에 부하를 균등 분산라운드로빈, 가중치 기반 로드 밸런서 구성
신뢰성 및 안정성이벤트 순서 보장분산 처리 환경에서 순서를 보장해야 하는 요구사항 대응Kafka 파티션 단위 순서 보장, 키 기반 순서 설계
멱등성 (Idempotency)동일 메시지 중복 처리 방지를 통해 중복 트랜잭션 발생 방지이벤트 ID 사용, 컨슈머 측 중복 체크 로직 구현
복제 및 내결함성브로커 장애 및 데이터 손실 방지Kafka 3-way replication, 페일오버 구성
장애 복구 및 재처리실패 메시지에 대한 DLQ 처리 및 재시도 전략DLQ 구성, Retry 정책, Backoff 설정
이벤트 크기 제한과도하게 큰 이벤트가 브로커 성능을 저하하지 않도록 제한payload 크기 제한, 압축 활용
운영 및 관찰성모니터링 및 경고지연, 오류율, 처리율 등 주요 지표에 대한 실시간 모니터링Prometheus + Grafana 대시보드, Alert 설정
추적 (Tracing)이벤트 흐름에 대한 분산 추적 가능Correlation ID, OpenTelemetry, Jaeger
백프레셔 (Backpressure)소비자가 따라가지 못할 때 시스템 전체 과부하 방지Reactive Streams, 메시지 버퍼 적용, Rate Limiter
부하 테스트 및 예측실제 트래픽을 반영한 사전 테스트로 병목 탐지Gatling, k6, chaos test 구성
메시지 계약 및 일관성메시지 스키마 관리생산자 - 소비자 간 메시지 계약 안정성 유지Schema Registry, 버전 관리, 호환성 정책
정확성 보장 전송 방식메시지가 중복되거나 누락되지 않도록 전송 전략 정의At-least-once 기본, Exactly-once 필요 시 트랜잭션 연계
메시지 정리 전략처리 완료된 메시지의 보관 주기 및 스토리지 최적화Kafka retention 설정, 압축 주기, 아카이빙 정책

핵심 정리

분류핵심 체크포인트
설계 관점파티셔닝, 메시지 크기, 이벤트 모델링
처리 관점병렬성, 배치, 캐싱, 필터링, 압축
운영 관점모니터링, 트레이싱, DLQ, 백프레셔 대응
확장/복구 관점오토스케일링, 페일오버, 복제, 멱등성, 정확한 재처리 설계
안정성 관점메시지 스키마 안정성, 순서 보장, 트랜잭션 보장 수준 (at-least, exactly)

주제와 관련하여 주목할 내용

카테고리주제핵심 항목설명 및 기술적 적절성 검토
아키텍처 특성이벤트 기반 아키텍처 (EDA)비동기성생산자와 소비자 간 시간적 분리로 시스템 유연성과 확장성 확보 가능. 대기 없이 처리가 가능한 구조.
확장성리스너 또는 소비자 추가가 쉬워 수평적 확장에 유리. 마이크로서비스 구조와 궁합이 좋음.
장애 격리생산자와 소비자의 결합도가 낮아 개별 실패가 전체 시스템에 영향을 덜 미침.
실시간성이벤트 발생과 동시에 처리 흐름 시작 가능. 스트리밍 처리 기반 설계에 적합.
패턴 및 설계Event Sourcing이벤트 기반 상태 재구성모든 변경 사항을 이벤트로 저장하여 상태 복구, 감사 추적 용이. Snapshot 최적화 필요.
CQRS명령 (Command) 과 조회 (Query) 분리성능 최적화, 확장성 향상에 기여. 읽기/쓰기 모델 독립 유지 가능.
Event Carried State Transfer이벤트에 상태를 포함하여 전달수신자가 자체 상태를 유지하지 않고 처리 가능. 분산 환경에서의 상태 동기화에 적합.
Saga Pattern분산 트랜잭션 관리 패턴Orchestration 또는 Choreography 방식으로 마이크로서비스 간 트랜잭션 보장.
Outbox PatternDB 트랜잭션 내 메시지 발행DB 와 메시지 전송의 원자성 보장. Kafka Connect 등과 연계해 안정성 확보.
도구 및 플랫폼Apache Kafka분산 스트리밍 플랫폼, Streams, KSQL 등고성능/확장성/내결함성 기반 대용량 이벤트 처리. 거의 업계 표준.
Apache Pulsar클라우드 네이티브 메시징, Geo-ReplicationKafka 보다 다양한 멀티테넌시 기능. 성능/기능 균형.
RabbitMQAMQP 기반 메시지 브로커실시간 처리와 라우팅에 적합. 복잡한 라우팅 및 보안에 유리.
NATS경량 메시징 시스템마이크로서비스 사이 고속 통신용으로 설계. 낮은 지연 시간에 강점.
Event Store이벤트 영속 저장소Event Sourcing 기반의 스토리지로 Snapshot 지원 필수.
Event Mesh분산 이벤트 브로커 네트워크서로 다른 메시지 시스템과 클라우드/온프레미스 환경 통합에 강력함. Solace, Confluent 등이 지원.
운영 및 모니터링Distributed TracingJaeger, Zipkin, OpenTelemetry이벤트 흐름 추적 및 디버깅. 메시지 기반 구조에서 추적 필수.
Observability로그, 메트릭, 트레이스 통합OpenTelemetry 기반 수집. 이벤트 손실/지연 모니터링에 유리.
보안TLS, OAuth, ACL이벤트 전송 시 암호화 및 권한 제어는 필수. 메시지 기반 보안 취약점 대비 필요.
기술 트렌드Serverless EDAAWS Lambda, Azure Functions이벤트 발생에 반응하여 실행되는 함수 기반 처리. 비용 효율적이며 운영 단순.
AI/ML 통합실시간 스트림 처리에서 추론이벤트 흐름에서 실시간 추론 모델을 직접 실행. 스트림 기반 ML 과 궁합.
Edge ComputingIoT/5G 기반 엣지 이벤트 처리중앙 서버가 아닌 말단에서 이벤트 처리 가능. 지연 최소화.
Multi-Cloud EDACross-Cloud 메시징이벤트 Mesh + 클라우드 간 라우팅 전략 필수. 장애 회복 및 지리적 확장에 유리.
Cloud NativeKubernetes, Service Mesh이벤트 기반 아키텍처를 컨테이너 환경에 통합. Sidecar 패턴과 함께 운영 최적화.
메시징 패턴Pub/Sub생산자 - 소비자 간 비동기, 다대다 모델느슨한 결합, 확장성 핵심. 이벤트 기반 시스템의 근간.

주제와 관련하여 반드시 학습해야할 내용

카테고리주제핵심 항목설명
1. 핵심 개념이벤트 기반 아키텍처 (EDA)이벤트, Producer, Consumer, Broker, 비동기, 디커플링시스템 구성 요소 간 느슨한 결합을 통해 확장성과 유연성을 확보하는 아키텍처 스타일
메시징 패턴Pub/Sub, Point-to-Point메시지 전달 방식으로 다수 소비자 또는 단일 소비자 구조 선택 가능
비동기 프로그래밍Callback, Promise, Future, Async/Await시간적으로 분리된 이벤트 기반 처리 메커니즘
분산 시스템 기초 이론CAP, ACID vs BASE, Consistency Models분산 환경에서의 트레이드오프 이해는 메시징 기반 설계에 필수
2. 아키텍처 패턴CQRSCommand-Query 분리쓰기와 읽기를 분리하여 확장성, 성능, 유지보수성 향상
Event SourcingEvent Log, Replay상태를 이벤트의 흐름으로 구성하여 재구성 가능
Saga Pattern보상 트랜잭션, 롱런 트랜잭션 처리분산 시스템에서 롤백 불가능한 트랜잭션 관리
Outbox PatternDB 와 메시지 브로커 간 정합성 확보트랜잭션 로그 기반 이벤트 발행 전략
SEDA단계별 큐 분리 구조각 처리 단계를 분리하여 유연한 비동기 처리 구조 구성
3. 이벤트 설계이벤트 모델링Domain Event vs Integration Event내부 도메인 로직과 외부 통합 메시지를 구분하여 설계
이벤트 스키마 관리Schema Registry, Schema Evolution메시지 포맷의 버전 관리 및 하위 호환성 확보
멱등성 / 중복 방지Idempotency Token, Deduplication중복 이벤트 처리 방지 및 안전한 재처리 가능성 확보
4. 메시지 브로커주요 브로커 기술 비교Kafka, RabbitMQ, Pulsar, NATS메시징 기술별 성능, 보장 수준, 프로토콜 특성 비교
Kafka 구조 심화Topic, Partition, Consumer Group병렬 처리, 확장성 확보를 위한 핵심 구조 이해
Routing & FilteringTopic Routing, Header 기반 필터링이벤트 전달 경로 설정 및 관심사 분리 구현
5. 운영 및 신뢰성메시지 추적 및 장애 대응Correlation ID, DLQ, Retry, Backpressure안정적 처리를 위한 장애 인식 및 복구 설계
모니터링 & 관찰 가능성Metrics, Logging, Tracing (Jaeger, Prometheus)이벤트 흐름, 지연, 에러에 대한 시각적 추적 가능성 확보
보안 및 인증TLS 암호화, OAuth2, 인증/인가메시징 전송의 무결성과 안전성 확보
정확성 보장At-least-once, At-most-once, Exactly-once처리 보장 수준에 따른 전략 및 보상 설계
6. 연계 기술 및 배포DevOps 및 배포 환경CI/CD, IaC, Canary, Blue-Green메시징 구성 요소의 지속적 배포 및 관리
컨테이너 및 오케스트레이션Docker, Kubernetes메시지 브로커 및 소비자 서비스의 배포 및 확장
스트림 처리 엔진Kafka Streams, Apache Flink이벤트 스트림 기반의 실시간 분석과 처리 기술
프레임워크 통합Spring Cloud Stream, NestJS Event Modules언어 및 플랫폼 기반 프레임워크 연계

용어 정리

카테고리용어설명
핵심 개념Event (이벤트)시스템 내에서 발생하는 의미 있는 상태 변화 또는 비즈니스 사건을 나타내는 불변 메시지 객체
Event Producer (발행자)이벤트를 생성하고 브로커에 발행하는 컴포넌트
Event Consumer (소비자)이벤트를 구독하여 처리하는 컴포넌트
Event Broker / Bus (브로커)이벤트를 중계, 라우팅, 전달하는 미들웨어 (예: Kafka, RabbitMQ 등)
Event Store (이벤트 저장소)이벤트를 영속적으로 저장하는 로그 기반 저장소
아키텍처 스타일Event-Driven Architecture느슨하게 결합된 컴포넌트가 이벤트를 통해 비동기적으로 상호작용하는 아키텍처
Event Mesh분산 환경에서 여러 이벤트 브로커를 연결해 메시지를 전파하는 메시징 인프라
Broker Topology중앙 브로커 기반의 메시지 라우팅 구조
Mediator Topology중앙 중재자가 워크플로우나 로직을 조정하는 구조 (ex: BPM, Orchestration 등)
아키텍처 패턴Event Sourcing시스템 상태 변경을 이벤트로 기록하여 상태를 복원할 수 있게 하는 아키텍처 패턴
CQRSCommand(명령) 와 Query(조회) 를 분리하여 확장성과 성능을 개선하는 패턴
Saga Pattern분산 트랜잭션에서 보상 트랜잭션 기반으로 일관성을 유지하는 장기 실행 트랜잭션 처리 패턴
Outbox Pattern데이터베이스와 이벤트 발행 사이의 정합성을 확보하기 위한 설계 기법
SEDA (Staged Event-Driven Arch.)각 단계 (stage) 를 독립된 큐로 분리하여 병렬성과 처리 유연성을 높이는 구조
메시징 기술 및 도구Apache Kafka분산 메시징 플랫폼. 고성능, 파티셔닝, 스트리밍 처리에 적합
RabbitMQAMQP 기반의 메시지 브로커. 라우팅 유연성과 신뢰성 높은 전송 지원
Schema Registry메시지 스키마를 중앙에서 관리하고 호환성을 보장하기 위한 시스템
AsyncAPI비동기 메시징 시스템의 계약 및 문서를 정의하기 위한 오픈 표준
Jaeger트레이싱을 위한 오픈소스 도구로, 이벤트 흐름 추적에 사용됨
운영 및 성능 기법DLQ (Dead Letter Queue)처리 실패한 메시지를 별도로 보관하여 재처리나 문제 분석에 활용
Correlation ID요청 - 응답 메시지를 연결하기 위한 고유 식별자
Idempotency (멱등성)동일한 메시지를 여러 번 처리해도 동일한 결과를 보장하는 성질
Exactly-once Delivery메시지가 중복 없이 정확히 한 번만 전달되도록 보장하는 전략
Eventual Consistency분산 환경에서 일시적으로 불일치하더라도 최종적으로 일관성에 도달하는 속성
Backpressure시스템 과부하를 방지하기 위해 메시지 흐름을 제어하는 메커니즘
Circuit Breaker장애 발생 시 시스템 전체 전파를 막기 위한 보호 장치
처리 기술Stream Processing실시간으로 지속적인 데이터 흐름을 처리하는 방식 (예: Flink, Kafka Streams 등)
Message Queue메시지를 큐에 임시 저장하고 순서 보장 및 비동기 처리를 가능하게 하는 구조
Partition메시지를 병렬로 처리할 수 있도록 Kafka 에서 사용하는 논리 단위

참고 및 출처

개요 및 개념

아키텍처 패턴 & 디자인

도구 및 기술 스택

실무 적용 사례

장점과 단점 분석