CQRS (Command Query Responsibility Segregation)

CQRS 는 애플리케이션의 쓰기 (Command)읽기 (Query)별도 모델·경로로 분리해 각 측면을 독립적으로 최적화·확장하는 패턴이다.

쓰기 모델은 도메인 규칙과 상태 변경을 책임지고, 읽기 모델은 조회 성능에 맞춰 전용 뷰 스키마/저장소를 쓴다.
필요에 따라 Event Sourcing 을 결합해 이벤트로 읽기 모델을 갱신할 수 있으나 필수는 아니다.

마이크로서비스·복잡 도메인에서 성능·확장성·보안을 높일 수 있지만, 구현·운영 복잡성일관성 관리 비용이 증가하므로 신중한 적용이 요구된다.

핵심 개념

CQRS 는 “쓰기 전용 모델” 과 “읽기 전용 모델” 을 완전히 분리해 각자에 맞게 설계·확장하는 방식이다.
쓰기는 도메인 규칙/트랜잭션을 지키고, 읽기는 질의 성능만 바라본다. 둘 사이 데이터는 주로 이벤트로 흘러가며, 이 과정에서 최종일관성을 받아들인다. 이벤트 전달의 원자성Outbox로, 다 단계 쓰기 조율은 Saga로 보완한다.

개념무엇왜 중요한가
CQRS읽기/쓰기 책임·모델·저장소 분리독립 최적화/확장, 복잡성 분리, 경합 감소.
CQS명령/조회 역할 분리 원칙 (메서드 레벨)CQRS 의 이론 기반, 부작용 최소화.
Write Model도메인 규칙·트랜잭션을 담는 쓰기 전용 모델무결성 보장·비즈니스 규칙 초점.
Read Model조회 전용 프로젝션/뷰/캐시고성능 조회·저비용 스케일아웃.
Event Sourcing상태 변경을 이벤트 로그로 저장이력/감사·재생·프로젝션 생성 용이.
최종일관성읽기 모델 반영이 지연될 수 있음성능·확장성 대가를 인지하고 UX 보정 필요.
OutboxDB 변경과 이벤트 발행의 원자성 보장듀얼 라이트/유실 방지·멱등 처리 토대.
Saga분산 쓰기 단계의 조율/보상 트랜잭션다 애그리게이트/서비스 쓰기 안전화.

실무 구현과의 연관성

핵심 개념왜 연관?어떻게 구현? (대표 수단)품질 지표/체크
CQRS읽기/쓰기 성격·튜닝 포인트가 다름API/핸들러·스키마 분리, 서비스/DB 분리R/W 비율, 읽기 p95, 쓰기 p95.
Read Model조회 부하 대응·개별 최적화머티리얼라이즈드 뷰, 캐시, NoSQL, 인덱스뷰 갱신 지연 (ms), 캐시 히트율.
Write Model무결성·도메인 규칙 중심애그리게이트/도메인 서비스·강한 트랜잭션도메인 불변식 위반률, 트랜잭션 재시도율.
최종일관성분리 전파 지연의 필연UX 보정 (스낵바, 로컬 캐시), R-Y-W 전략읽기 최신성 SLA, 간극 분포.
Event Sourcing읽기 모델 생성·감사 요구이벤트→프로젝션 파이프라인재생 시간, 이벤트 멱등률.
OutboxDB 변경 + 이벤트 발행 원자화트랜잭션 내 Outbox 기록→릴레이 전송미전송 이벤트 수, 중복 전송률.
Saga분산 쓰기 조율오케스트레이션/코레오그래피·보상 트랜잭션보상률, 타임아웃률.

기초 이해 (Foundation Understanding)

개념 정의 및 본질

CQRS 는 프로그램에서 " 쓰기 (명령)" 와 " 읽기 (조회)" 를 완전히 다른 경로로 처리하는 설계 방법이다.
예를 들어, 주문을 넣을 때는 쓰기 모델이 처리하고, 주문 내역을 볼 때는 조회 모델이 처리한다.
이렇게 나누면 각 경로를 성능에 맞게 최적화할 수 있지만, 두 모델의 데이터가 맞춰져 있어야 해서 동기화가 필요하다. 작은 시스템보다는 복잡하고 읽기·쓰기 비율이 크게 다른 시스템에 유리하다.

구분내용
정의데이터 변경 (명령) 과 조회 (쿼리) 의 책임을 별도의 모델로 분리하는 아키텍처 패턴
기원Robert C. Martin 의 CQS 원칙 확장
구성 요소Command Model(쓰기 전용), Query Model(읽기 전용), 별도 저장소, 이벤트 버스
목적읽기/쓰기 요구사항을 독립적으로 최적화, 성능·확장성 향상
특징모델·저장소·서비스 계층 분리, Event Sourcing 과 잘 결합
장점읽기 성능 최적화, 확장성 확보, 유지보수 용이
단점데이터 동기화 복잡성, 과도 설계 가능성
적용 사례읽기 요청이 많은 시스템, 복잡한 도메인, DDD 환경

CQRS 는 읽기와 쓰기를 분리해 각 경로를 최적화하는 아키텍처 패턴으로, 확장성과 성능을 높이는 대신 동기화 복잡성이 따른다.

graph TB
    Client[클라이언트] --> CommandAPI[명령 API]
    Client --> QueryAPI[조회 API]
    
    CommandAPI --> CommandModel[명령 모델]
    QueryAPI --> QueryModel[조회 모델]
    
    CommandModel --> WriteDB[(쓰기 DB)]
    QueryModel --> ReadDB[(읽기 DB)]
    
    WriteDB --> EventBus[이벤트 버스]
    EventBus --> ReadDB

등장 배경 및 발전 과정

CQRS 는 **" 읽기와 쓰기 문제를 따로 푼다 “**는 생각에서 출발했다.
CQS(메서드 원칙) → CQRS(시스템 설계) 로 확장되며, 읽기는 빠르게·쓰기는 정확하게를 동시에 달성하도록 진화했다. 클라우드 시대엔 읽기를 마음껏 수평 확장하고, 쓰기는 도메인 규칙에 집중하면서 둘 사이를 이벤트/비동기로 연결하는 게 표준이 됐다.

등장 배경
발전 과정
시기사건/주요 인물왜 등장했나무엇이 개선됐나
1988~90sCQS·Bertrand Meyer부작용 분리·코드 추론성 향상명령/조회 책임 경계 확립
2006~2010CQRS 정립·Greg Young읽기/쓰기 튜닝 포인트 상이R/W 모델·스토리지 분리로 독립 최적화
2009~2012Clarified CQRS, CQRS Journey오해 정리·실무 지침 필요ES 와의 관계 정리, 레퍼런스 아키텍처·가이드 제공
2010s~현재클라우드·마이크로서비스·이벤트 기반읽기 우세·분산·독립 확장 요구읽기 수평 확장, 쓰기 무결성, 최종 일관성 운영
timeline
  title CQRS: 등장 배경 및 발전 과정
  section 개념 형성
    1988 : CQS 원칙 제안 (Bertrand Meyer) – 메서드 수준 분리
  section 패턴 정립
    2006-2010 : CQRS 명명·정립 (Greg Young) – 읽기/쓰기 분리 확립
    2009 : Clarified CQRS (Udi Dahan) – 오해 정리
  section 실무 확산
    2012 : Microsoft "CQRS Journey" – 레퍼런스 구현·가이드
    2010s-2025 : 클라우드·MSA·이벤트 기반 확산 – 수평 확장·최종 일관성

핵심 동기 및 가치 제안

CQRS 의 목적은 서로 다른 요구를 가진 읽기와 쓰기를 갈라서 잘하게 만들기다. 쓰기는 규칙·트랜잭션 일관성에 집중하고, 읽기는 빠른 응답과 다양한 뷰에 집중한다. 이렇게 나누면 각 부분을 따로 설계·배포·확장할 수 있어 성능과 유지보수성이 좋아진다. 필요하면 Event Sourcing 을 더해 이벤트로 읽기 모델을 채우고 이력/감사도 강화할 수 있지만, 반드시 결합해야 하는 것은 아니다.

범주목적/가치왜 필요한가적용 포인트
성능/확장성읽기·쓰기 독립 최적화/스케일경로 특성이 다름 (검증·트랜잭션 vs. 조회·캐시)쓰기: 도메인 모델·트랜잭션, 읽기: 뷰 DB/리플리카
복잡성 관리변경 영향 축소·테스트 용이규칙/조회 요구 동시 충족 시 결합도↑코드·스키마·핸들러를 분리 배치
유지보수/팀 조직병렬 개발·독립 배포대규모 팀에서 충돌/대기 비용 큼서비스/리포지토리 레벨 분리, CI/CD 분리
보안/권한변경 경로 엄격, 조회 경로 안전 공개쓰기 권한·검증이 더 중요커맨드 핸들러 보호, 조회는 캐시/리드뷰로 확장
이벤트성 (옵션)이력/감사/재생, 비동기 업데이트감사·타임머신 요구, 읽기 갱신 모델 필요ES 로 이벤트 발행→리드 모델 물질화 (필수 아님)

주요 특징

CQRS 는 “쓰기 길(명령) 과 읽기 길(조회) 을 아예 다른 도로로 나누는 설계 " 다. 그래서 읽기 도로는 빠르게 조회하도록 깔고, 쓰기 도로는 규칙·검증을 두껍게 깐다. 바쁘면 읽기 도로만 폭 넓게 확장할 수 있고, 쓰기·읽기를 다른 DB로 운영할 수도 있다. 다만 두 도로 사이를 이벤트로 이어 주기 때문에, 읽기 쪽이 약간 늦게 반영되는 최종 일관성을 감수해야 한다.

특징설명왜 가능한가/근거
모델 분리명령과 조회를 다른 모델·인터페이스로 분리CQRS 정의 자체가 분리 원칙을 전제.
독립 최적화/스케일읽기·쓰기 각각 스키마·캐시·인덱싱·배포 단위 최적화·확장Azure 가 독립 스케일·스키마 최적화를 명시.
저장소 분리/폴리글랏읽기/쓰기 저장소를 분리하고 기술을 달리 쓸 수 있음" 서로 다른 데이터 스토어/기술 사용 " 을 공식적으로 제시.
이벤트 기반 갱신쓰기 이벤트를 읽기 모델의 프로젝션/뷰 갱신에 사용Materialized View·ES 문서의 조합 패턴.
최종 일관성분리된 저장소 간 비동기 동기화로 잠시 지연 허용Azure 가 ‘Eventual consistency’ 트레이드오프를 명시.
보안/검증 경계쓰기 측에 검증·권한 집중, 읽기 측은 읽기 전용 DTOAzure 가 보안 이점과 역할 분리를 명시.
CQS 기반" 질의는 상태를 바꾸지 않음 " 원칙을 시스템 레벨로 확장Fowler CQS 가 사상적 배경을 제공.

핵심 이론 (Core Theory)

핵심 설계 원칙

CQRS 는 도로 두 개를 까는 설계다. 한 도로 (쓰기) 는 신호·검문이 빡세고, 다른 도로 (읽기) 는 고속도로처럼 빠르다.
쓰기 도로에서 난 변화는 이벤트로 읽기 도로에 전달돼 안내표지 (프로젝션) 가 갱신된다. 다만 표지 갱신이 살짝 늦을 수 있음 (최종 일관성) 을 감수하면, 확장성과 성능을 크게 얻는다.

원칙목적필요한 이유
책임 분리 (CQS)읽기/쓰기 부수효과·성능 요구 분리한 모델로 두 요구 충족 시 상충·결합 ↑
단일 책임·관심사 분리쓰기=규칙·검증, 읽기=조회 최적화변경 이유 분리로 충돌·복잡도 ↓
독립 최적화·스케일각 워크로드별 성능·비용 최적읽기 편중/쓰기 편중을 개별 확장
이벤트 기반 갱신읽기 모델 (프로젝션) 최신화모델 분리로 직접 조인 불가→이벤트 전파
최종 일관성성능·가용성 확보비동기 동기화로 전파 지연 수용
폴리글랏 저장소모델별 최적 DB 선택데이터 특성별 강점 활용
보안·검증 경계변경 경로에 강한 검증·인가공격면 축소·감사 용이
단순함 우선과설계 방지복잡성·운영 비용 상쇄

핵심은 분리이고, 효과는 독립 최적화·확장이다. 읽기는 프로젝션으로 빠르게, 쓰기는 규칙·검증을 두껍게. 저장소는 필요하면 다르게, 일관성은 최종적으로 맞춘다. 단, 복잡성 비용이 정당화될 때만 쓴다.

기본 원리 및 동작 메커니즘

CQRS 는 “쓰기랑 읽기를 갈라서, 각자 잘하게 만든다” 가 전부다. 쓰기 경로는 규칙 검증과 트랜잭션에 집중하고, 읽기 경로는 빠른 응답과 맞춤형 뷰에 집중한다. 쓰기 후 이벤트를 흘려 보내 프로젝션이 읽기 DB 를 갱신하기 때문에, 조회 데이터는 약간 늦게 반영될 수 있다 (대신 속도·확장성이 좋아진다).
Event Sourcing 을 더하면 이벤트로 상태를 재구성·감사할 수 있지만 꼭 필요하진 않다.

기본 원리
원리요지구현 포인트
책임 분리Command/Query 모델·핸들러·저장소 분리CommandHandler/QueryHandler, WriteDB/ReadDB 분리
CQS 기반Command=상태 변경, Query=부작용 없음메서드/엔드포인트 계층에서 원칙 준수
물질화 뷰이벤트로 ReadDB 갱신 (프로젝션)비정규화/캐시/리드뷰, 재빌드 가능
일관성 모델Eventual consistency 허용읽기 지연·보상 로직·리드 모델 랙 모니터링
선택적 ESES 는 옵션(감사·재생 강화)Event Store, 스냅샷·리플레이 전략
적용 판단이득>복잡성일 때 채택조회 패턴 다양/스케일 요구 뚜렷할 때 유리
동작 메커니즘
sequenceDiagram
    participant Client
    participant CmdAPI as Command API
    participant CmdHandler as Command Handler
    participant WriteDB as Write DB / Aggregate
    participant EvtBus as Event Bus (옵션: Event Store)
    participant Proj as Projection / ReadModel Updater
    participant ReadDB as Read DB (Materialized View)
    participant QryAPI as Query API
    participant QryHandler as Query Handler

    Client->>CmdAPI: Command 전송
    CmdAPI->>CmdHandler: 검증/도메인 규칙 실행
    CmdHandler->>WriteDB: 상태 변경(트랜잭션)
    CmdHandler-->>EvtBus: 이벤트 발행(필요 시)
    EvtBus-->>Proj: 이벤트 전달
    Proj->>ReadDB: 읽기 모델 갱신(비동기)

    Client->>QryAPI: Query 전송
    QryAPI->>QryHandler: 질의 처리
    QryHandler->>ReadDB: 물질화 뷰 조회
    ReadDB-->>QryHandler: 결과
    QryHandler-->>Client: 응답
graph TD
    User[사용자] --> API
    API --> CommandHandler[명령 핸들러]
    CommandHandler --> WriteDB[쓰기DB]
    API --> QueryHandler[조회 핸들러]
    QueryHandler --> ReadDB[조회DB]
    WriteDB --> EventBus[이벤트 버스]
    EventBus --> ReadDB

아키텍처 및 구성 요소

CQRS 는 쓰기 (정확성)읽기 (속도)서로 다른 모델과 저장소로 나눠 각자 최적화하는 설계다.
쓰기에서 바뀐 내용은 이벤트/배치로 읽기 모델에 반영되어, 전체 시스템은 보통 최종 일관성으로 동작한다. 신뢰성·분산 일관성은 OutboxSaga로 보강한다.

graph TB
  %% 외부 인터페이스
  U[Client/UI] -->|Commands| CMDAPI[Command API/Bus]
  U -->|Queries| QAPI[Query API/Bus]

  %% Command Side
  subgraph "Command Side (Write)"
    CH[Command Handler<br/>- 검증·도메인 규칙·트랜잭션]
    WM[Write Model]
    WDB[(Write DB)]
    CH --> WM --> WDB
  end
  CMDAPI --> CH

  %% 신뢰성 보완
  subgraph Reliability
    OX[Outbox Table]
    RELAY[Outbox Relay]
  end
  WDB -. 동일 트랜잭션 .-> OX
  OX --> RELAY

  %% 이벤트/프로젝션
  subgraph Eventing
    MB[(Event Bus/Broker)]
    ES[(Event Store)]
    PROJ[Projection/Read Model Updater]
  end
  RELAY --> MB
  MB --> ES
  MB --> PROJ

  %% Query Side
  subgraph "Query Side (Read)"
    RM[Read Model<br/>- 비정규화/인덱스/캐시]
    RDB[(Read DB/Cache)]
    QH[Query Handler]
    PROJ --> RM --> RDB
    QAPI --> QH --> RDB
  end

  %% 분산 트랜잭션
  subgraph "Coordination (Optional)"
    SAGA[Saga Orchestrator/Choreography]
  end
  CH -. cross-service commands .-> SAGA
  SAGA -. emits .-> MB
구성 요소
구성 요소핵심 역할대표 기술/패턴품질 지표 예시
Command Handler명령 검증·도메인 규칙·트랜잭션계층형/DDD 애그리게이트쓰기 p95, 실패율
Write Model/DB무결성·동시성 관리RDBMS/도큐먼트Deadlock/Retry 율
Outbox/Relay이벤트 발행 원자화Outbox 테이블·릴레이미전송 이벤트 수
Event Bus비동기 전달Kafka/RabbitMQ지연/중복률
Event Store이벤트 영속EventStoreDB 등재생 시간
Projection읽기 모델 생성Stream processor뷰 갱신 지연
Read Model/DB조회 최적화NoSQL/캐시/뷰읽기 p95/히트율
Query Handler질의 처리API/GraphQL/GRPC오류율
Saga분산 쓰기 조율Orchestrator/Choreo보상률/타임아웃

주요 기능과 역할

CQRS 는 프로그램에서 " 데이터 바꾸는 부분 " 과 " 데이터 보는 부분 " 을 완전히 다른 구조로 설계하는 방법이다.

영역구성 요소주요 기능개선·해결 효과
명령 처리 (Command Side)Command Model, Command Handler비즈니스 규칙 검증, 상태 변경, 트랜잭션 처리, 이벤트 발행데이터 무결성·일관성 확보, 도메인 규칙 강화
조회 처리 (Query Side)Query Model, Query Handler성능 최적화 조회, 다양한 View 제공, 캐싱·비정규화 활용응답 속도 향상, 읽기 전용 확장성 강화
이벤트 관리·프로젝션Event Bus, Projection, Event Store(선택)이벤트 발행·전달, 읽기 모델 업데이트, 감사 추적최종 일관성 유지, 결합도 감소, 장애 복구 용이

CQRS 의 Command 는 무결성과 규칙 준수를, Query 는 조회 성능과 확장성을, Event 관리·프로젝션은 양측 간 연결과 일관성 유지를 담당한다.

특성 분석 (Characteristics Analysis)

장점 및 이점

CQRS 는 도로를 두 개로 가르는 설계다.
한쪽 (쓰기) 은 규칙·검증이 빡센 도로, 다른 쪽 (읽기) 은 고속도로다. 고속도로 표지판 (프로젝션/뷰) 은 쓰기에서 온 이벤트로 갱신된다. 그래서 읽기를 크게 늘려도 쓰기가 버티고, 필요하면 두 도로를 따로 확장하거나 다른 DB를 쓸 수 있다.
단, 표지판 갱신이 약간 늦는 최종 일관성은 설계·UX 로 흡수해야 한다.

구분항목설명
성능조회 성능 향상읽기 모델을 프로젝션/머티리얼라이즈드 뷰로 구성해 복잡 조인 없이 고속 응답
성능쓰기 성능·안정성쓰기 모델이 도메인 규칙·트랜잭션에 집중하여 락 경합·조회 부담 감소
확장성독립적 스케일링읽기/쓰기 파이프라인을 별도로 수평 확장
확장성폴리글랏 퍼시스턴스읽기/쓰기에 다른 저장소·스키마 (예: SQL/NoSQL) 적용
유지보수관심사 분리쓰기=검증·규칙, 읽기=DTO/뷰로 분리되어 변경 충돌·복잡도 감소
보안권한 경계 강화쓰기 엔드포인트에만 변경 권한을 집중, 읽기는 최소 권한 조회
조직병렬 개발·배포읽기/쓰기 컴포넌트를 분리해서 팀 단위 병렬 개발/배포
운영 (조합)감사/재구성 (ES 결합)이벤트 로그 재생으로 읽기 모델 재생성·감사 추적

핵심은 분리다. 분리 덕에 읽기는 빠르게, 쓰기는 엄격하게 다듬을 수 있고, 각자 따로 확장·따로 DB를 선택한다. 감사·재구성은 ES 를 곁들였을 때 특히 강력해진다.

단점 및 제약사항과 해결방안

CQRS 의 단점은 한마디로 ” 복잡해지고, 읽기가 늦게 반영될 수 있다 “ 이다. 그래서 이득이 확실한 경우(읽기/쓰기 부하·요구가 매우 다를 때) 만 쓴다. 신뢰성은 Outbox(이중쓰기 제거) 와 멱등 소비자(중복 안전), Saga(분산 보상) 로 다지고, ES 를 쓰면 스냅샷/아카이빙으로 크기와 복구 시간을 관리한다.

범주단점/문제원인영향탐지·진단예방해결 기법/패턴
복잡성/학습구성요소·경로 증가모델/DB/브로커 분리개발·운영 비용↑아키텍처 리뷰단계적 도입·경계 명확화" 필요할 때만 " 채택 (가이드 준수)
최종 일관성읽기 지연/불일치비동기 프로젝션UX 혼란읽기 랙 모니터링기대치·SLA 명시UI 지연 표시·보상 트랜잭션 (사가)
인프라 비용다중 DB·브로커기술 스택 확대장애 지점↑·비용↑비용/장애 지표IaC·관리형 서비스아키텍처 단순화·리소스 상한
테스트 복잡경로별 검증 필요흐름 분리테스트 시간↑계약/통합 테스트표준 템플릿리플레이/계약 테스트 체계화
메시징 실패/중복/순서재시도·중복소비·오더링분산·네트워크 특성상태 불일치중복률/오프셋 랙파티셔닝·키 설계멱등 소비자, 트랜잭션/EOS 활용
이중쓰기DB 쓰기↔이벤트 발행 사이 실패분산 경계데이터·이벤트 불일치발행 실패율로컬 트랜잭션 우선Transactional Outbox + CDC
스키마 진화이벤트/뷰 버전 불일치장기 진화역직렬화 오류호환성 테스트스키마 레지스트리업/다운캐스터·버저닝 전략
ES 팽창/복구 지연이벤트 무한 적재ES 도입저장비용↑·리플레이 지연스토리지/리플레이 시간보존정책스냅샷/아카이빙

핵심 리스크는 복잡성·EC·메시징 불확실성이고, 실무 기본값은 Outbox(이중쓰기 제거) + 멱등 소비자 (중복 안전) + Saga(보상) 다. ES 를 쓰면 스냅샷/아카이빙·스키마 버저닝이 생존 전략이다.

트레이드오프 관계 분석

CQRS 는 쓰기=정확성, 읽기=속도를 따로 최적화해 성능과 확장을 얻는 대신, 복잡도와 운영 부담을 치른다.
강한 즉시 일관성이 필요한 기능은 동기화·RPC 를, 대량 조회·피드·검색은 비동기·이벤트·폴리글랏으로 가져가고, 신뢰성은 Outbox, 분산 일관성은 Saga, UX 는 낙관적 UI로 메운다.

트레이드오프 축선택지장점단점적용 기준관련 패턴/완화
성능/확장성 ↔ 복잡도CRUD단순, 운영 쉬움확장/튜닝 충돌소규모, 즉시일관성 강함-
CQRS독립 최적화·확장구조·운영 복잡R≫W, 질의 다양프로젝션/캐시
일관성 ↔ 확장성동기 반영최신성 보장지연·결합↑결제·재고 등트랜잭션/RPC
비동기 반영처리량·내결함성↑RYW 미보장피드/리포트낙관적 UI/세션 일관성
신뢰성 ↔ 단순성직접 발행구현 간단유실·중복 위험프로토타입-
Outbox원자성·재시도·멱등릴레이 운영 부담프로덕션멱등 키/DLQ
일관성 ↔ 유연성단일 트랜잭션ACID 롤백경계 확장 불가모놀리식-
Saga분산 일관성·확장보상·타임아웃 복잡다 서비스 쓰기오케스트레이션/코레오
유연성 ↔ 운영 부담단일 DB운영 단순튜닝 충돌단순 도메인-
폴리글랏목적별 최적화거버넌스·비용↑대규모/다양 질의스키마 버전닝/백필
결합도/지연 ↔ 복잡도동기 RPC단순·예측가능스파이크 취약강한 상호의존Circuit Breaker
이벤트 구독느슨 결합·버퍼링순서·중복 처리비동기 허용멱등성/리플레이
graph LR
    A[성능 향상] <--> B[복잡성 증가]
    C[확장성] <--> D[일관성 약화]
    E[유연성] <--> F[운영 부담]
    G[개발 자유도] <--> H[통합 복잡도]

구현 및 분류 (Implementation & Classification)

구현 기법 및 방법

분류정의구성 요소원리목적사용 상황특징
단일 저장소 CQRS한 DB 로 쓰기/읽기 로직·모델만 분리Command/Query Handler, Shared DB쓰기·읽기 모델 분리 (논리), 동일 저장소구조적 분리 도입, 저비용 시작초기 단계, 단일 서비스단순·낮은 운영비, 이행 쉬움. 필요 시 분리 저장소로 확장
분리된 저장소 CQRS쓰기/읽기 DB 분리Write DB(RDB), Read DB(NoSQL/캐시), 동기화쓰기 후 이벤트→읽기 저장소 갱신독립 확장, 쿼리 성능 최적화읽기 비율↑, 고성능 UI스키마 다변화·확장 용이, 최종 일관성 설계 필요
ES + CQRS이벤트 스토어가 진실원, 이벤트 재생으로 읽기 모델Event Store, Aggregate, Projection변경을 이벤트로 영속, 재생으로 상태/뷰 구성감사/시간여행/재구성규제/감사, 복잡한 도메인이력 완전, 복잡도↑·스냅샷/재생 비용 고려
Outbox + CDCDB 트랜잭션 안에 이벤트 기록, CDC 로 브로커 전송Outbox Table, CDC(Debezium 등), Broker“DB 쓰기 = Outbox 기록 " 원자화 → CDC이중 쓰기/유실 방지Kafka·Debezium 환경일관성↑, 지연 약간↑, 표준화 쉬움
파티션 키 설계키별 파티션 고정으로 파티션 내 순서 보장Producer(Key), Topic/Partition동일 키→동일 파티션, 파티션 내 순서상태 전이 순서 보장주문/계정 등 엔티티별 이벤트글로벌 순서 X, 키 선정이 관건

단일 DB 로 논리 분리부터 시작하고, 필요 시 저장소 분리로 확장한다. 이벤트 소싱은 감사/재생이 필요할 때 채택한다. 정합성은 Outbox+CDC 로, 순서는 파티션 키로 보장한다. 단순 도메인엔 과도 설계를 피한다.

단일 저장소 + Outbox
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# pip install fastapi uvicorn sqlalchemy psycopg2-binary pydantic
# 핵심: 동일 트랜잭션 내에 '도메인 쓰기'와 'outbox 이벤트 기록'을 함께 커밋
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, JSON, text
from sqlalchemy.orm import declarative_base, sessionmaker

engine = create_engine("postgresql://user:pass@localhost/app", future=True)
Session = sessionmaker(bind=engine, expire_on_commit=False, future=True)
Base = declarative_base()

class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True)
    customer_id = Column(String, nullable=False)
    status = Column(String, default="CREATED")

class Outbox(Base):
    __tablename__ = "outbox"
    id = Column(Integer, primary_key=True)
    aggregate_id = Column(String, nullable=False)
    type = Column(String, nullable=False)          # e.g. "OrderCreated"
    payload = Column(JSON, nullable=False)

class CreateOrderCommand(BaseModel):
    customer_id: str

app = FastAPI()

@app.post("/orders")
def create_order(cmd: CreateOrderCommand):
    with Session.begin() as s:  # 1) 트랜잭션 시작
        order = Order(customer_id=cmd.customer_id)
        s.add(order)

        # 2) 같은 트랜잭션에서 Outbox 레코드 삽입 (정합성 보장)
        evt = Outbox(
            aggregate_id=str(order.id),
            type="OrderCreated",
            payload={"orderId": order.id, "customerId": order.customer_id},
        )
        s.add(evt)

    # 3) Debezium Outbox Router가 outbox 테이블 CDC→Kafka로 게시(비동기)
    return {"orderId": order.id, "status": "CREATED"}
분리 저장소 + 파티션 키 + 프로젝션
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// npm i kafkajs mongodb
// 핵심: (1) Producer가 "주문ID"를 파티션 키로 사용 → 파티션 내 순서 보장
//      (2) Consumer가 이벤트를 받아 MongoDB 읽기 모델을 업데이트(프로젝션)

import { Kafka } from "kafkajs";
import { MongoClient } from "mongodb";

// 1) Producer: 파티션 키=orderId
async function publishOrderEvent(broker: string, orderId: string, payload: any) {
  const kafka = new Kafka({ clientId: "orders", brokers: [broker] });
  const producer = kafka.producer();
  await producer.connect();
  await producer.send({
    topic: "order.events",
    messages: [{ key: orderId, value: JSON.stringify(payload) }], // 파티션 키
  });
  await producer.disconnect();
}

// 2) Consumer/Projection: MongoDB 읽기 모델 업데이트
async function startProjection(broker: string, mongoUri: string) {
  const kafka = new Kafka({ clientId: "orders-projection", brokers: [broker] });
  const consumer = kafka.consumer({ groupId: "orders-read-model" });
  const mongo = new MongoClient(mongoUri);
  await mongo.connect();
  const view = mongo.db("read").collection("order_views");

  await consumer.connect();
  await consumer.subscribe({ topic: "order.events", fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ message }) => {
      if (!message.value) return;
      const event = JSON.parse(message.value.toString());
      // 이벤트 타입에 따라 읽기 모델 비정규화/업데이트
      if (event.type === "OrderCreated") {
        await view.updateOne(
          { orderId: event.orderId },
          { $set: { orderId: event.orderId, customerId: event.customerId, status: "CREATED" } },
          { upsert: true }
        );
      }
      if (event.type === "OrderPaid") {
        await view.updateOne({ orderId: event.orderId }, { $set: { status: "PAID" } });
      }
    },
  });
}

// 사용 예시
// publishOrderEvent("localhost:9092", "order-123", { type: "OrderCreated", orderId: "order-123", customerId: "c-1" });
// startProjection("localhost:9092", "mongodb://localhost:27017").then(() => console.log("projection running"));

분류 기준에 따른 유형 구분

CQRS 를 레고 블록처럼 생각하자.

  1. 먼저 분리 수준 블록 (Type 1→2→3) 중 가능한 수준을 고른다.
  2. 다음 저장소 구성(단일↔이중↔폴리글랏) 을 고르고,
  3. 일관성 방식(동기↔비동기) 을 정한다.
  4. 필요하면 이벤트 블록(Outbox/ES) 까지 붙인다.
    → 이렇게 조합해 지금의 요구(트래픽, 팀, 규정) 에 맞춘다.
분류 축유형핵심 설명언제 쓰나트레이드오프/주의
분리 수준Type 1: 클래스 분리명령/조회 클래스를 분리 (같은 모델)초기 진입, 코드 충돌 완화성능 이득 제한.
Type 2: 모델 분리명령/조회 도메인 모델 분리, 동일 DB읽기/쓰기 변경 이유가 뚜렷할 때스키마/코드 중복 증가.
Type 3: 저장소 분리Write-Read 저장소 분리 (읽기는 프로젝션)대규모 조회/확장성 최우선최종 일관성·동기화 파이프라인 필요.
저장소 구성단일 DB CQRS같은 DB, 읽기용 뷰/테이블·인덱스초기/운영 단순화읽기 확장 한계.
이중 DB CQRSWrite/Read DB 분리읽기 폭증, 비용 최적화복제/동기화 운영 필요.
폴리글랏서로 다른 DB 기술 믹스워크로드별 최적화복잡한 운영·보안 거버넌스.
일관성동기요청 내 읽기 즉시 반영강한 일관성·소규모확장성 제한.
비동기 (최종)이벤트로 프로젝션 갱신대규모·고처리량 시스템지연·재시도/멱등성 설계 필요.
이벤트 결합Outbox+CDC이중쓰기 방지, 신뢰성↑다중 스토어 동기화CDC 인프라 운영.
CQRS+ES이벤트 스토어 + 프로젝션 재생감사/이력 필수 도메인설계/운영 복잡성↑.
적용 범위도메인 단위특정 바운디드 컨텍스트만 적용점진 도입경계 설계 필요.
시스템 전역전 서비스로 확장공통 거버넌스 용이락인·조직 복잡성↑.

핵심은 분리 수준을 적정선으로 잡고(Type 1→3), 그 위에 저장소 구성일관성 방식을 조합하는 것이다. 비동기 + 프로젝션이 대규모에 흔하고, Outbox/ES는 필요 시점에만 더해 운영 복잡성을 통제한다.

실무 적용 (Practical Application)

실습 예제 및 코드 구현

간단한 주문 시스템에서 CQRS 를 적용

시나리오: 간단한 주문 시스템에서 CQRS 를 적용해 쓰기/읽기 분리

시스템 구성:

시스템 구성 다이어그램:

graph TB
    C[Client] --> CA[Command API]
    CA --> CH[Command Handler]
    CH --> WDB[Write DB]
    CH --> EB[Event Bus]
    EB --> QH[Query Handler]
    QH --> RDB[Read DB]
    C --> QA[Query API]
    QA --> RDB

Workflow:

  1. Client 가 주문 생성 요청 (Command) 전송
  2. Command Handler 가 Write DB 저장 후 이벤트 발행
  3. Query Handler 가 이벤트 수신 후 Read DB 업데이트
  4. Client 가 Query API 로 조회 요청 → Read DB 응답

유무에 따른 차이점:

구현 예시 (Python/FastAPI + Redis):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Command API 예시
from fastapi import FastAPI
from kafka import KafkaProducer
import json

app = FastAPI()
producer = KafkaProducer(bootstrap_servers='localhost:9092')

@app.post("/orders")
def create_order(order: dict):
    # 1. Write DB에 저장 (간략화 예시)
    print(f"Saving order: {order}")
    # 2. 이벤트 발행
    producer.send('order_created', json.dumps(order).encode())
    return {"status": "order created"}

# Query API 예시
from fastapi import FastAPI
import redis

query_app = FastAPI()
cache = redis.Redis(host='localhost', port=6379)

@query_app.get("/orders/{order_id}")
def get_order(order_id: str):
    # Redis 캐시 조회
    data = cache.get(order_id)
    return json.loads(data) if data else {"error": "Not found"}
읽기 모델 (Projection) 을 Redis 와 OpenSearch 로 갱신

시나리오: " 주문 (Order) 생성→결제 승인→배송 시작 " 이벤트가 발생하면, 읽기 모델 (Projection) 을 Redis 와 OpenSearch 로 갱신하는 CQRS+Outbox 미니 시스템.

시스템 구성:

시스템 구성 다이어그램:

graph TB
  C[Client] --> CA["Command API(FastAPI)"]
  CA --> WDB[(PostgreSQL)]
  CA --> OUTBOX[(outbox)]
  Deb[Debezium CDC] --> K[Kafka]
  K --> PC[Projection Consumer]
  PC --> R[Redis - Read Model]
  C --> QA["Query API(FastAPI)"]
  QA --> R

Workflow:

  1. Command API 가 쓰기 트랜잭션으로 ordersoutbox 에 기록 (이중 쓰기 방지)
  2. Debezium 이 outbox 변화 감지 → Kafka 토픽 발행
  3. Projection Consumer 가 이벤트를 수신하여 Redis 의 읽기 모델 업데이트
  4. Query API 는 Redis 에서 빠른 조회 응답

유무에 따른 차이점:

구현 예시 (Python/SQL - 핵심 부분 주석 포함)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# FastAPI Command API: 트랜잭션 내 본데이터+Outbox에 함께 기록하여 이중 쓰기 문제 방지
from fastapi import FastAPI
import psycopg2, json, uuid, time

app = FastAPI()
conn = psycopg2.connect("dbname=orders user=app password=secret host=localhost")

@app.post("/orders")
def create_order(payload: dict):
    order_id = str(uuid.uuid4())
    evt = {
        "event_id": str(uuid.uuid4()),
        "type": "OrderCreated",
        "aggregate_id": order_id,
        "occurred_at": int(time.time()),
        "data": payload,
        "version": 1
    }
    with conn:
        with conn.cursor() as cur:
            # 1) 본 데이터 저장 (쓰기 모델)
            cur.execute("INSERT INTO orders(id, status, total_amount) VALUES (%s, %s, %s)",
                        (order_id, "CREATED", payload["total_amount"]))
            # 2) Outbox에 이벤트 기록 (동일 트랜잭션)
            cur.execute("""INSERT INTO outbox(event_id, aggregate_id, event_type, payload, occurred_at, status)
                           VALUES (%s,%s,%s,%s, to_timestamp(%s), 'PENDING')""",
                        (evt["event_id"], evt["aggregate_id"], evt["type"], json.dumps(evt), evt["occurred_at"]))
    return {"order_id": order_id, "status": "created"}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
-- PostgreSQL 테이블 (쓰기 모델 + Outbox)
CREATE TABLE orders(
  id uuid PRIMARY KEY,
  status text NOT NULL,
  total_amount numeric NOT NULL
);

CREATE TABLE outbox(
  event_id uuid PRIMARY KEY,
  aggregate_id uuid NOT NULL,
  event_type text NOT NULL,
  payload jsonb NOT NULL,
  occurred_at timestamp NOT NULL,
  status text NOT NULL
);
-- Debezium은 outbox 테이블의 변경을 캡처하여 Kafka로 전달 (CDC)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Projection Consumer (Kafka -> Redis): 동일 aggregate_id 키를 사용해 순서 보장(단일 파티션 가정)
from kafka import KafkaConsumer
import redis, json

r = redis.Redis(host="localhost", port=6379)
consumer = KafkaConsumer(
    "orders-events",
    bootstrap_servers=["localhost:9092"],
    group_id="projection-readmodel",
    enable_auto_commit=False)

for msg in consumer:
    evt = json.loads(msg.value)
    agg_id = evt["aggregate_id"]  # 파티션 키로 설정되어 있다고 가정
    # 이벤트 타입별 Projection 갱신
    if evt["type"] == "OrderCreated":
        r.hset(f"order:{agg_id}", mapping={"status":"CREATED", "total": evt["data"]["total_amount"]})
    elif evt["type"] == "OrderPaid":
        r.hset(f"order:{agg_id}", mapping={"status":"PAID"})
    elif evt["type"] == "OrderShipped":
        r.hset(f"order:{agg_id}", mapping={"status":"SHIPPED"})
    consumer.commit()  # at-least-once, idempotent 갱신 로직 권장
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# FastAPI Query API: 읽기 전용(Projection) 경로
from fastapi import FastAPI
import redis, json

q = FastAPI()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

@q.get("/orders/{order_id}")
def get_order(order_id: str):
    data = r.hgetall(f"order:{order_id}")
    return data if data else {"error": "not-found"}
게시글 등록 및 조회 서비스

시나리오: 게시글 등록 및 조회 서비스 (도메인별 CQRS 패턴 구현)

시스템 구성:

시스템 구성 다이어그램:

graph TB
    User[사용자] --> API
    API --> CommandService
    API --> QueryService
    CommandService --> WriteDB
    CommandService --> EventBus[Kafka/EventBus]
    EventBus --> QueryService
    QueryService --> ReadDB

Workflow:

  1. 사용자가 게시글 등록 요청 (Command)
  2. CommandService 가 처리 후 WriteDB 저장/이벤트 발행
  3. QueryService 가 이벤트를 감지, 게시글 정보 갱신
  4. 사용자는 실시간으로 게시글을 조회 (쿼리)

핵심 역할: 독립적 읽기/쓰기 구조로 실시간 데이터 일관성 보장

유무에 따른 차이점:

구현 예시 (Python)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# commands.py
class CreatePostCommand:
    """게시글 생성 명령 객체 - CQRS 패턴의 명령 모델 역할"""
    def __init__(self, title, content):
        self.title = title
        self.content = content

# handlers.py
class CommandHandler:
    """명령 처리 핸들러 - 게시글 생성 로직 분리"""
    def handle(self, command):
        # 비즈니스 로직 수행, DB에 저장, 이벤트 발행
        save_to_write_db(command)
        publish_event("PostCreated", command)

# queries.py
class GetPostQuery:
    """게시글 조회 쿼리 객체 - CQRS 패턴의 조회 모델 역할"""
    def __init__(self, post_id):
        self.post_id = post_id

class QueryHandler:
    """조회 처리 핸들러 - 읽기 전용 DB 또는 캐시 조회"""
    def handle(self, query):
        return fetch_from_read_db(query.post_id)
전자상거래 주문 관리 시스템

시나리오: 전자상거래 주문 관리 시스템

시스템 구성:

시스템 구성 다이어그램:

graph TB
    Client[클라이언트] --> API[API 게이트웨이]
    
    API --> CMD[주문 명령 API]
    API --> QRY[주문 조회 API]
    
    CMD --> OrderService[주문 서비스]
    QRY --> QueryService[조회 서비스]
    
    OrderService --> PostgreSQL[(PostgreSQL)]
    OrderService --> Kafka[Apache Kafka]
    
    Kafka --> Projection[프로젝션 서비스]
    Projection --> Redis[(Redis)]
    Projection --> Elasticsearch[(Elasticsearch)]
    
    QueryService --> Redis
    QueryService --> Elasticsearch

Workflow:

  1. 클라이언트가 주문 생성 요청
  2. 주문 서비스가 비즈니스 로직 검증 후 PostgreSQL 에 저장
  3. 주문 생성 이벤트를 Kafka 로 발행
  4. 프로젝션 서비스가 이벤트를 수신하여 Redis 와 Elasticsearch 업데이트
  5. 조회 요청은 Redis/Elasticsearch 에서 처리

핵심 역할:

유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# 1. 도메인 모델 (Domain Model)
from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Order:
    """주문 애그리게이트 (Order Aggregate)"""
    id: str
    customer_id: str
    items: List['OrderItem']
    status: str
    created_at: datetime
    total_amount: float

    def calculate_total(self) -> float:
        """총 주문 금액 계산 - 도메인 로직"""
        return sum(item.price * item.quantity for item in self.items)

    def confirm(self):
        """주문 확정 - 비즈니스 규칙 적용"""
        if self.status != 'pending':
            raise ValueError("이미 처리된 주문입니다")
        self.status = 'confirmed'

# 2. 명령 핸들러 (Command Handler)
class CreateOrderCommandHandler:
    def __init__(self, repository, event_bus):
        self.repository = repository  # PostgreSQL 저장소
        self.event_bus = event_bus    # Kafka 이벤트 버스

    async def handle(self, command: 'CreateOrderCommand'):
        """주문 생성 명령 처리"""
        # 도메인 객체 생성 및 검증
        order = Order(
            id=command.order_id,
            customer_id=command.customer_id,
            items=command.items,
            status='pending',
            created_at=datetime.now(),
            total_amount=0
        )
        
        # 비즈니스 로직 실행
        order.total_amount = order.calculate_total()
        order.confirm()
        
        # 영속성 저장 (PostgreSQL)
        await self.repository.save(order)
        
        # 이벤트 발행 (비동기 처리를 위한 이벤트)
        await self.event_bus.publish(OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            total_amount=order.total_amount,
            items=order.items
        ))

# 3. 조회 핸들러 (Query Handler)
class OrderQueryHandler:
    def __init__(self, redis_client, elasticsearch_client):
        self.redis = redis_client           # 빠른 조회용
        self.elasticsearch = elasticsearch_client  # 검색용

    async def get_order_summary(self, order_id: str):
        """주문 요약 조회 - Redis에서 빠른 조회"""
        # Redis에서 비정규화된 주문 요약 데이터 조회
        order_data = await self.redis.hgetall(f"order:{order_id}")
        return {
            'order_id': order_data.get('id'),
            'customer_name': order_data.get('customer_name'),
            'total_amount': float(order_data.get('total_amount', 0)),
            'status': order_data.get('status'),
            'item_count': int(order_data.get('item_count', 0))
        }

    async def search_orders(self, customer_id: str, query: str):
        """주문 검색 - Elasticsearch에서 검색"""
        search_body = {
            "query": {
                "bool": {
                    "must": [
                        {"match": {"customer_id": customer_id}},
                        {"multi_match": {
                            "query": query,
                            "fields": ["items.name", "status"]
                        }}
                    ]
                }
            }
        }
        
        result = await self.elasticsearch.search(
            index="orders", 
            body=search_body
        )
        return [hit['_source'] for hit in result['hits']['hits']]

# 4. 프로젝션 핸들러 (Projection Handler)
class OrderProjectionHandler:
    def __init__(self, redis_client, elasticsearch_client, customer_service):
        self.redis = redis_client
        self.elasticsearch = elasticsearch_client
        self.customer_service = customer_service

    async def handle_order_created(self, event: 'OrderCreatedEvent'):
        """주문 생성 이벤트 처리 - 읽기 모델 업데이트"""
        
        # 고객 정보 조회 (외부 서비스 호출)
        customer = await self.customer_service.get_customer(event.customer_id)
        
        # Redis에 주문 요약 저장 (빠른 조회용)
        order_summary = {
            'id': event.order_id,
            'customer_id': event.customer_id,
            'customer_name': customer.name,
            'total_amount': str(event.total_amount),
            'status': 'confirmed',
            'item_count': str(len(event.items)),
            'created_at': datetime.now().isoformat()
        }
        
        await self.redis.hset(f"order:{event.order_id}", mapping=order_summary)
        
        # Elasticsearch에 검색용 문서 저장
        search_document = {
            'order_id': event.order_id,
            'customer_id': event.customer_id,
            'customer_name': customer.name,
            'total_amount': event.total_amount,
            'status': 'confirmed',
            'items': [{'name': item.name, 'category': item.category} 
                     for item in event.items],
            'created_at': datetime.now()
        }
        
        await self.elasticsearch.index(
            index="orders", 
            id=event.order_id, 
            body=search_document
        )

# 5. API 엔드포인트 (FastAPI 예시)
from fastapi import FastAPI, HTTPException

app = FastAPI()

# 명령 API 엔드포인트
@app.post("/orders")
async def create_order(command: CreateOrderCommand):
    """주문 생성 - 명령 처리"""
    try:
        await order_command_handler.handle(command)
        return {"message": "주문이 성공적으로 생성되었습니다", "order_id": command.order_id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

# 조회 API 엔드포인트  
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    """주문 조회 - 쿼리 처리"""
    order = await order_query_handler.get_order_summary(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="주문을 찾을 수 없습니다")
    return order

@app.get("/customers/{customer_id}/orders/search")
async def search_customer_orders(customer_id: str, q: str = ""):
    """고객 주문 검색 - 복잡한 쿼리 처리"""
    orders = await order_query_handler.search_orders(customer_id, q)
    return {"orders": orders, "total": len(orders)}
CQRS + 이벤트 소싱 + 사가 패턴 통합 아키텍처 실전 예시

시나리오: 대형 이커머스에서 결제와 배송을 포함한 주문 처리 프로세스를 여러 서비스에 걸친 분산 트랜잭션으로 관리한다.

시스템 구성:

graph LR
    User[사용자] --> APIGateway[API Gateway]
    APIGateway --> OrderCmdSvc[Order Command Service]
    OrderCmdSvc-->|OrderCreatedEvent|EventBus(Kafka)
    EventBus-->|OrderSagaStart|SagaCoordinator
    SagaCoordinator-->|PaymentCommand|PaymentCmdSvc[Payment Command Service]
    PaymentCmdSvc-->|PaymentApprovedEvent|EventBus
    SagaCoordinator-->|InventoryCommand|InventoryCmdSvc[Inventory Command Service]
    InventoryCmdSvc-->|InventoryReservedEvent|EventBus
    SagaCoordinator-->|DeliveryCommand|DeliveryCmdSvc[Delivery Command Service]
    DeliveryCmdSvc-->|DeliveryScheduledEvent|EventBus
    SagaCoordinator-->|OrderCompletedEvent|EventBus

Workflow:

  1. 사용자 주문 요청 (Command)
  2. Order Command 서비스가 주문 생성 후 OrderCreatedEvent(주문 생성 이벤트) 발행
  3. Saga Coordinator(사가 조정자) 가 이벤트 수신, 결제/재고/배송 등 하위 서비스 명령 분산
  4. 각 서비스가 처리 후 PaymentApprovedEvent(결제 승인 이벤트), InventoryReservedEvent(재고 예약 이벤트), DeliveryScheduledEvent(배송 예약 이벤트) 등을 이벤트 버스로 발행
  5. Saga Coordinator 가 이벤트 종합 후, 최종적으로 OrderCompletedEvent(주문 완료 이벤트) 발행 또는 실패 시 보상 트랜잭션 (Compensation) 수행

사가 패턴의 통합 처리 Event Flow 설명:

  1. OrderCreatedEvent가 발생하면
    → Saga Coordinator(사가 조정자) 가 ApprovePaymentCommand를 결제 서비스로 전송
  2. 결제 성공 (PaymentApprovedEvent)
    → Saga Coordinator 가 ReserveInventoryCommand를 재고 서비스로 전송
  3. 재고 성공 (InventoryReservedEvent)
    → Saga Coordinator 가 ScheduleDeliveryCommand를 배송 서비스로 전송
  4. 모든 단계가 Success 일 때
    → Saga Coordinator 가 OrderCompletedEvent를 발행
    → 모든 서비스 Read DB 에 최종 상태 반영
  5. 단계별 실패 시 (예: 결제 승인 실패)
    → Saga Coordinator 가 보상 명령 (PaymentCompensationCommand 등) 을 발행
    → 이전 단계 변경분을 Rollback, 전체 주문 취소 이벤트 (OrderCancelledEvent) 발행

핵심 역할:

유무에 따른 차이점:

구현 예시:

  1. 사가 코디네이터 보상 로직 예시 (Saga Coordinator Compensation Logic)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    
    # saga_coordinator.py
    class SagaCoordinator:
        """사가 패턴 중심 주문 트랜잭션 처리"""
        def __init__(self):
            self.state = {}
    
        def handle_order_created(self, event):
            # 결제 서비스 명령 전송
            self.publish_command('payment', {'order_id': event.order_id, 'amount': event.amount})
    
        def handle_payment_approved(self, event):
            # 재고 서비스 명령 전송
            self.publish_command('inventory', {'order_id': event.order_id, 'items': event.items})
    
        def handle_inventory_reserved(self, event):
            # 배송 서비스 명령 전송
            self.publish_command('delivery', {'order_id': event.order_id, 'address': event.address})
    
        def handle_delivery_scheduled(self, event):
            # 주문 완료 이벤트 발행
            self.publish_event('order_completed', {'order_id': event.order_id})
    
        def handle_failure(self, stage, event):
            # 실패 단계에 따라 보상(Compensation) 트랜잭션 실행
            if stage == 'payment':
                # 결제 취소 로직
                self.publish_command('payment_compensate', {'order_id': event.order_id})
            elif stage == 'inventory':
                # 재고 복원 로직
                self.publish_command('inventory_compensate', {'order_id': event.order_id})
            elif stage == 'delivery':
                # 배송 예약 취소 로직
                self.publish_command('delivery_compensate', {'order_id': event.order_id})
    
        def publish_command(self, service, payload):
            # 각 서비스에 명령 메시지 전달
            pass  # 실제 구현은 Kafka/RabbitMQ 등 사용
    
        def publish_event(self, event_type, payload):
            # 마무리 이벤트 송신 (Kafka 등)
            pass
    
  2. 결제 서비스 (Payment Command/Query)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    # payment_command.py
    class ApprovePaymentCommand:
        """결제 승인 명령 - CQRS 명령 모델"""
        def __init__(self, order_id, amount):
            self.order_id = order_id
            self.amount = amount
    
    # payment_handler.py
    def handle_approve_payment(command: ApprovePaymentCommand):
        """결제 승인 처리 - 결제 DB 업데이트 + 이벤트 발행"""
        save_payment_to_db(command.order_id, command.amount)
        payment_event = {"order_id": command.order_id, "status": "APPROVED"}
        publish_event("payment_approved", payment_event)
    
    # payment_query.py
    class GetPaymentStatusQuery:
        """결제 상태 조회 쿼리 - CQRS 조회 모델"""
        def __init__(self, order_id):
            self.order_id = order_id
    
    def handle_get_payment_status(query: GetPaymentStatusQuery):
        """결제 상태 조회 실행"""
        return fetch_payment_status_from_read_db(query.order_id)
    
  3. 재고 서비스 (Inventory Command/Query)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    # inventory_command.py
    class ReserveInventoryCommand:
        """재고 예약 명령 - CQRS 명령 모델"""
        def __init__(self, order_id, items):
            self.order_id = order_id
            self.items = items
    
    def handle_reserve_inventory(command: ReserveInventoryCommand):
        """재고 예약 처리 - 재고 DB 업데이트 + 이벤트 발행"""
        for item in command.items:
            update_inventory_count(item["product_id"], -item["quantity"])
        inventory_event = {"order_id": command.order_id, "status": "RESERVED"}
        publish_event("inventory_reserved", inventory_event)
    
    # inventory_query.py
    class GetInventoryStatusQuery:
        """재고 상태 조회 쿼리"""
        def __init__(self, product_id):
            self.product_id = product_id
    
    def handle_get_inventory_status(query: GetInventoryStatusQuery):
        """조회 DB에서 재고 상태 반환"""
        return fetch_inventory_status_from_read_db(query.product_id)
    
  4. 배송 서비스 (Delivery Command/Query)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    # delivery_command.py
    class ScheduleDeliveryCommand:
        """배송 예약 명령 모델"""
        def __init__(self, order_id, address):
            self.order_id = order_id
            self.address = address
    
    def handle_schedule_delivery(command: ScheduleDeliveryCommand):
        """배송 예약 처리 - 배송 DB 기록 + 이벤트 발행"""
        save_delivery_schedule_to_db(command.order_id, command.address)
        delivery_event = {"order_id": command.order_id, "status": "SCHEDULED"}
        publish_event("delivery_scheduled", delivery_event)
    
    # delivery_query.py
    class GetDeliveryStatusQuery:
        """배송 상태 조회 쿼리"""
        def __init__(self, order_id):
            self.order_id = order_id
    
    def handle_get_delivery_status(query: GetDeliveryStatusQuery):
        """조회 DB에서 배송 상태 반환"""
        return fetch_delivery_status_from_read_db(query.order_id)
    
  5. 이벤트 소비 및 상태 반영 예시 (Query Side)

    1
    2
    3
    4
    5
    6
    7
    8
    
    # event_handler.py
    def handle_order_completed_event(event):
        """주문 완료 이벤트 수신 후 Read DB 갱신"""
        update_order_status_in_read_db(event['order_id'], status='COMPLETED')
    
    def handle_payment_approved_event(event):
        """결제 승인 이벤트 수신 후 Read DB 반영"""
        update_payment_status_in_read_db(event['order_id'], status='APPROVED')
    
    • 각 핸들러마다 CQRS 분리, 이벤트 소비 후 Read DB(Materialized View, 비정규화 DB) 빠른 상태 반영

실제 도입 사례

CQRS 도입은 읽기와 쓰기의 요구가 크게 다를 때 힘을 발휘한다. 주문·거래처럼 규칙과 트랜잭션이 중요한 쓰기 경로는 도메인 규칙에 맞춰 설계하고, 상품 검색·대시보드처럼 속도가 중요한 읽기 경로는 비정규화된 물질화 뷰로 최적화한다. 이벤트를 흘려 프로젝션이 읽기 뷰를 갱신하므로, 조회는 빠르되 약간의 지연이 있을 수 있다. 처음엔 단순 분리로 시작하고, 필요해지면 브로커·ES를 더해 확장한다.

도메인/목적전형 구성 (쓰기↔읽기)함께 쓰는 기술/패턴핵심 효과
이커머스 주문·검색 분리주문/결제=RDB 트랜잭션 ↔ 카탈로그/검색=물질화 뷰 (Elasticsearch/캐시)Kafka/Outbox, 프로젝션, 물질화 뷰피크 트래픽에서도 조회 안정·지연 감소, 팀별 독립 확장.
금융 원장/조회거래=ES 로 이벤트 영속 ↔ 잔액/명세=리드뷰파티셔닝·순서보장, 스냅샷, 사가감사/복구 용이, 순서 일관성 유지, 규제 대응.
실시간 분석/대시보드인입 스트림 ↔ OLAP/검색용 여러 읽기 뷰Kafka, ClickHouse/OpenSearch, 다중 프로젝션다양한 질의 패턴에 최적화된 뷰 제공, 확장 용이.
소셜/콘텐츠 피드포스팅/상호작용=쓰기 ↔ 피드/검색=읽기캐시/검색 엔진, 물질화 뷰짧은 응답시간, 읽기 부하 흡수.
IoT/텔레메트리시계열 쓰기 ↔ 집계/알람용 읽기시계열 DB, 스트림 프로세싱, 물질화 뷰실시간 모니터링·집계 분리, 비용 효율적 처리.
서버리스 CQRS/ESDynamoDB(Event store) ↔ OpenSearch/Redis(읽기)Streams→Lambda 프로젝션, Outbox/CDC운영 자동화·탄력 확장, 관리 부담↓.
단계적 도입 (레퍼런스)동일 DB 내 논리적 CQRS→필요 시 분리eShopOnContainers(간단 CQRS)리스크 낮은 시작·점진 확장.

실제 도입 사례의 코드 구현

Netflix 스타일의 개인화 추천 시스템

시나리오: Netflix 스타일의 개인화 추천 시스템

시스템 구성:

시스템 구성 다이어그램:

graph TB
    User[사용자] --> API[추천 API]
    
    subgraph "명령 측면 (행동 수집)"
        API --> TrackingAPI[행동 추적 API]
        TrackingAPI --> Kafka[Apache Kafka]
        Kafka --> Cassandra[(Cassandra)]
    end
    
    subgraph "ML 파이프라인"  
        Cassandra --> Spark[Apache Spark]
        Spark --> MLModel[ML 모델]
    end
    
    subgraph "조회 측면 (추천 제공)"
        API --> RecommendAPI[추천 조회 API]
        RecommendAPI --> Redis[(Redis)]
        RecommendAPI --> MongoDB[(MongoDB)]
    end
    
    MLModel --> Redis
    MLModel --> MongoDB

Workflow:

  1. 사용자 행동 데이터 실시간 수집 및 Kafka 로 스트리밍
  2. Cassandra 에 원시 행동 데이터 저장
  3. Spark 가 배치로 데이터를 처리하여 ML 모델 훈련
  4. 개인화된 추천 결과를 Redis/MongoDB 에 저장
  5. 사용자 요청 시 사전 계산된 추천 결과 즉시 반환

핵심 역할:

유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# 1. 사용자 행동 추적 명령 핸들러
import asyncio
from datetime import datetime
import json

class UserBehaviorTrackingHandler:
    def __init__(self, kafka_producer, cassandra_session):
        self.kafka_producer = kafka_producer
        self.cassandra = cassandra_session

    async def track_user_interaction(self, event: 'UserInteractionEvent'):
        """사용자 상호작용 추적 - Netflix의 시청/평가/검색 행동"""
        
        # 실시간 스트리밍을 위한 Kafka 이벤트 발행
        event_data = {
            'user_id': event.user_id,
            'content_id': event.content_id,
            'interaction_type': event.interaction_type,  # view, rate, search, click
            'interaction_value': event.value,  # 시청 시간, 평점 등
            'timestamp': datetime.now().isoformat(),
            'context': {
                'device_type': event.device_type,
                'session_id': event.session_id,
                'location': event.location
            }
        }
        
        # Kafka로 실시간 이벤트 스트리밍
        await self.kafka_producer.send(
            topic='user-interactions',
            key=event.user_id,
            value=json.dumps(event_data)
        )
        
        # Cassandra에 원시 데이터 저장 (대용량 시계열 데이터 처리)
        await self.cassandra.execute_async("""
            INSERT INTO user_interactions 
            (user_id, content_id, interaction_type, interaction_value, timestamp, context)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            event.user_id, 
            event.content_id, 
            event.interaction_type, 
            event.interaction_value,
            datetime.now(), 
            json.dumps(event_data['context'])
        ))

# 2. 추천 시스템 조회 핸들러
class RecommendationQueryHandler:
    def __init__(self, redis_client, mongodb_client):
        self.redis = redis_client      # 빠른 개인화 추천 캐시
        self.mongodb = mongodb_client  # 복잡한 추천 메타데이터

    async def get_personalized_recommendations(self, user_id: str, category: str = None):
        """개인화된 추천 목록 조회 - 사전 계산된 결과 반환"""
        
        # Redis에서 개인화된 추천 목록 조회 (밀리초 응답)
        cache_key = f"recommendations:{user_id}:{category or 'all'}"
        cached_recommendations = await self.redis.get(cache_key)
        
        if cached_recommendations:
            recommendations = json.loads(cached_recommendations)
            
            # MongoDB에서 상세 메타데이터 조회
            content_ids = [rec['content_id'] for rec in recommendations]
            content_details = await self.mongodb.find(
                "contents", 
                {"_id": {"$in": content_ids}},
                {"title": 1, "thumbnail": 1, "description": 1, "rating": 1}
            )
            
            # 추천 점수와 콘텐츠 상세 정보 결합
            detailed_recommendations = []
            for rec in recommendations:
                content_detail = next(
                    (c for c in content_details if c['_id'] == rec['content_id']), 
                    None
                )
                if content_detail:
                    detailed_recommendations.append({
                        **rec,
                        **content_detail,
                        'recommendation_reason': rec.get('reason', '당신을 위한 추천')
                    })
            
            return detailed_recommendations[:20]  # 상위 20개 반환
        
        # 캐시 미스인 경우 기본 추천 반환 (실제로는 fallback 로직)
        return await self._get_default_recommendations(category)

    async def get_trending_content(self, time_window: str = '24h'):
        """실시간 트렌딩 콘텐츠 조회"""
        trending_key = f"trending:{time_window}"
        trending_data = await self.redis.zrevrange(
            trending_key, 0, 19, withscores=True
        )
        
        return [
            {
                'content_id': content_id.decode(),
                'trending_score': score,
                'rank': idx + 1
            }
            for idx, (content_id, score) in enumerate(trending_data)
        ]

# 3. ML 기반 추천 파이프라인 (배치 처리)
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

class RecommendationPipelineHandler:
    def __init__(self, spark_session, cassandra_config, redis_client):
        self.spark = spark_session
        self.cassandra_config = cassandra_config
        self.redis = redis_client

    async def process_recommendations(self):
        """주기적 추천 모델 훈련 및 결과 생성"""
        
        # Cassandra에서 사용자 행동 데이터 로드
        user_interactions = self.spark.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(table="user_interactions", keyspace="recommendations") \
            .load() \
            .filter("timestamp > date_sub(current_date(), 30)")  # 최근 30일 데이터
        
        # 협업 필터링을 위한 데이터 전처리
        indexer_user = StringIndexer(inputCol="user_id", outputCol="user_idx")
        indexer_content = StringIndexer(inputCol="content_id", outputCol="content_idx")
        
        df_indexed = indexer_user.fit(user_interactions).transform(user_interactions)
        df_indexed = indexer_content.fit(df_indexed).transform(df_indexed)
        
        # 암시적 평점 계산 (시청 시간, 완료율 등으로부터)
        df_ratings = df_indexed.withColumn("rating", 
            self._calculate_implicit_rating(df_indexed))
        
        # ALS 모델 훈련
        als = ALS(
            maxIter=10,
            regParam=0.1,
            userCol="user_idx",
            itemCol="content_idx", 
            ratingCol="rating",
            coldStartStrategy="drop"
        )
        
        model = als.fit(df_ratings)
        
        # 모든 사용자에 대한 추천 생성
        user_recommendations = model.recommendForAllUsers(20)
        
        # 추천 결과를 Redis에 저장
        for row in user_recommendations.collect():
            user_id = self._get_original_user_id(row.user_idx)
            recommendations = [
                {
                    'content_id': self._get_original_content_id(rec.content_idx),
                    'score': float(rec.rating),
                    'reason': self._generate_recommendation_reason(rec)
                }
                for rec in row.recommendations
            ]
            
            # Redis에 개인화 추천 저장 (24시간 TTL)
            await self.redis.setex(
                f"recommendations:{user_id}:all",
                86400,  # 24시간
                json.dumps(recommendations)
            )

    def _calculate_implicit_rating(self, df):
        """암시적 평점 계산 로직"""
        # 시청 시간, 완료율, 재시청 여부 등을 종합한 점수
        from pyspark.sql.functions import when, col
        
        return when(
            col("interaction_type") == "view",
            col("interaction_value") / 3600.0  # 시청 시간(초) -> 시간 단위 점수
        ).when(
            col("interaction_type") == "rate", 
            col("interaction_value")  # 직접 평점
        ).when(
            col("interaction_type") == "complete",
            5.0  # 완료 시청 = 높은 점수
        ).otherwise(1.0)

# 4. API 엔드포인트 (FastAPI)
from fastapi import FastAPI, Query
from typing import Optional

app = FastAPI(title="Netflix-Style Recommendation API")

@app.post("/track")
async def track_interaction(interaction: UserInteractionEvent):
    """사용자 상호작용 추적 - 명령 API"""
    await behavior_handler.track_user_interaction(interaction)
    return {"status": "tracked"}

@app.get("/users/{user_id}/recommendations")
async def get_recommendations(
    user_id: str, 
    category: Optional[str] = Query(None, description="콘텐츠 카테고리")
):
    """개인화된 추천 조회 - 쿼리 API"""
    recommendations = await recommendation_handler.get_personalized_recommendations(
        user_id, category
    )
    return {
        "user_id": user_id,
        "recommendations": recommendations,
        "generated_at": datetime.now().isoformat()
    }

@app.get("/trending")
async def get_trending(time_window: str = Query("24h", regex="^(1h|24h|7d)$")):
    """트렌딩 콘텐츠 조회"""
    trending = await recommendation_handler.get_trending_content(time_window)
    return {
        "time_window": time_window,
        "trending": trending
    }

# 5. 이벤트 프로세서 (실시간 트렌딩 계산)
class TrendingEventProcessor:
    def __init__(self, kafka_consumer, redis_client):
        self.kafka_consumer = kafka_consumer
        self.redis = redis_client

    async def process_trending_events(self):
        """실시간 트렌딩 점수 계산"""
        async for message in self.kafka_consumer:
            event_data = json.loads(message.value)
            
            if event_data['interaction_type'] == 'view':
                # 실시간 트렌딩 점수 업데이트
                content_id = event_data['content_id']
                timestamp = datetime.fromisoformat(event_data['timestamp'])
                
                # 시간 기반 가중치 적용 (최근일수록 높은 가중치)
                weight = self._calculate_time_weight(timestamp)
                
                # Redis Sorted Set에 트렌딩 점수 누적
                await self.redis.zincrby("trending:24h", weight, content_id)
                await self.redis.zincrby("trending:7d", weight * 0.7, content_id)
                
                # TTL 설정으로 오래된 데이터 자동 정리
                await self.redis.expire("trending:24h", 86400)  # 24시간
                await self.redis.expire("trending:7d", 604800)  # 7일

    def _calculate_time_weight(self, timestamp):
        """시간 기반 가중치 계산 (최근일수록 높은 가중치)"""
        time_diff = datetime.now() - timestamp
        hours_ago = time_diff.total_seconds() / 3600
        return max(1.0, 24.0 - hours_ago) / 24.0  # 0-24시간 전 데이터에 가중치
Serverless CQRS + Event Sourcing

시나리오: DynamoDB(Event Store) 에 이벤트를 Append 하고, DynamoDB Streams → AWS Lambda 로 OpenSearch(또는 Redis) Projection 을 생성.

효과: 서버 관리 최소화, 초당 이벤트 폭증에도 원활한 수평 확장, 비용 효율성.

시스템 구성:

시스템 구성 다이어그램:

graph TB
  C[Client] --> APIGW[API Gateway]
  APIGW --> W["Lambda(Command)"]
  W --> DDB[(DynamoDB Event Store)]
  DDB --> DS[DynamoDB Streams]
  DS --> P["Lambda(Projection)"]
  P --> OS[OpenSearch / Redis]
  C --> QAPIGW["API Gateway(Query)"]
  QAPIGW --> QR["Lambda(Query)"]
  QR --> OS

Workflow:

  1. Command Lambda 가 OrderCreated 이벤트를 이벤트 스토어 (DDB) 에 Append
  2. Streams 가 이벤트를 트리거 → Projection Lambda 가 OpenSearch/Redis 업데이트
  3. Query Lambda 가 Projection 에서 조회 반환

유무에 따른 차이점:

구현 예시 (Python Lambda 핵심)

대형 전자상거래 플랫폼에서 " 주문 (Order)” 과 " 재고 (Inventory)” 관리

시나리오: 대형 전자상거래 플랫폼에서 " 주문 (Order)” 과 " 재고 (Inventory)" 를 CQRS + Event Sourcing 방식으로 관리.

시스템 구성:

시스템 구성 다이어그램:

graph TB
    User[사용자] --> API[API Gateway]
    API --> OrderCmdSvc[Order Command Service]
    API --> OrderQrySvc[Order Query Service]
    OrderCmdSvc --> WriteDB[(Order WriteDB)]
    OrderCmdSvc --> EventBus["Event Bus (Kafka)"]
    EventBus --> OrderQrySvc
    OrderQrySvc --> ReadDB[(Order ReadDB)]

Workflow:

  1. 사용자가 주문 생성 요청 (Command)
  2. Order Command Handler 가 Write Model 에 저장
  3. 주문 생성 이벤트 (OrderCreated) 를 Event Store 에 발행
  4. Order Query Handler 가 이벤트를 소비하여 Read Model 갱신
  5. 사용자가 주문 내역 또는 재고를 실시간 조회 (Query)

핵심 역할:

유무에 따른 차이점:

구현 예시 (Python + Kafka + Redis 등):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# command_models.py
class CreateOrderCommand:
    """주문 생성 명령 - CQRS의 Command 역할"""
    def __init__(self, order_id, items):
        self.order_id = order_id
        self.items = items

# event_models.py
class OrderCreatedEvent:
    """주문 생성 이벤트 - Event Sourcing 핵심 역할"""
    def __init__(self, order_id, items):
        self.order_id = order_id
        self.items = items

# handlers.py (Command Side)
def handle_create_order(command: CreateOrderCommand):
    """명령 처리 -> Write DB 저장 + 이벤트 발행"""
    save_order_to_write_db(command)
    event = OrderCreatedEvent(command.order_id, command.items)
    publish_event_to_kafka("order_created", event.__dict__)

# handlers.py (Query Side)
def handle_order_created_event(event: OrderCreatedEvent):
    """이벤트 처리 -> Read DB 갱신"""
    update_order_in_read_db(event.order_id, event.items)

# message_broker.py (Kafka Publisher & Consumer)
from kafka import KafkaProducer, KafkaConsumer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def publish_event_to_kafka(topic, event_data):
    """이벤트 발행"""
    producer.send(topic, event_data)
    producer.flush()

def consume_order_created_events():
    """이벤트 수신 후 Query 모델 갱신"""
    consumer = KafkaConsumer(
        'order_created',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    for message in consumer:
        event_data = message.value
        event = OrderCreatedEvent(**event_data)
        handle_order_created_event(event)
전자상거래 플랫폼

시스템 구성:

시스템 구성도:

graph TB
    subgraph "사용자 인터페이스"
        A[웹 애플리케이션]
        B[모바일 앱]
    end
    
    subgraph "명령 측 (Order Service)"
        C[Order API]
        D[Order Handler]
        E[Order DB]
    end
    
    subgraph "조회 측 (Catalog Service)"
        F[Catalog API]
        G[Search Handler]
        H[Read DB]
    end
    
    subgraph "인프라"
        I[Event Bus]
        J[Cache Layer]
    end
    
    A --> C
    B --> C
    A --> F
    B --> F
    C --> D
    D --> E
    D --> I
    I --> G
    G --> H
    G --> J
    F --> G

Workflow:

  1. 사용자가 주문 생성 요청
  2. Order Handler 가 비즈니스 로직 검증
  3. Order DB 에 주문 정보 저장
  4. OrderCreated 이벤트 발행
  5. Catalog Service 가 이벤트 수신
  6. Read DB 업데이트 (재고 감소)
  7. 캐시 갱신

CQRS 역할:

CQRS 유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# CQRS 패턴을 활용한 전자상거래 주문 시스템
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any
import uuid
from datetime import datetime
import asyncio

# 도메인 이벤트
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime
    version: int

# 명령 (Commands)
@dataclass
class CreateOrderCommand:
    customer_id: str
    items: List[Dict[str, Any]]
    
@dataclass
class UpdateInventoryCommand:
    product_id: str
    quantity: int

# 쿼리 (Queries)
@dataclass
class GetOrderHistoryQuery:
    customer_id: str
    
@dataclass
class SearchProductsQuery:
    keyword: str
    category: str = None

# 이벤트 스토어
class EventStore:
    def __init__(self):
        self.events: List[DomainEvent] = []
        self.subscribers: List[callable] = []
    
    async def save_event(self, event: DomainEvent):
        self.events.append(event)
        # 이벤트 발행
        for subscriber in self.subscribers:
            await subscriber(event)
    
    def get_events(self, aggregate_id: str) -> List[DomainEvent]:
        return [e for e in self.events if e.aggregate_id == aggregate_id]
    
    def subscribe(self, handler: callable):
        self.subscribers.append(handler)

# 애그리게이트 루트
class Order:
    def __init__(self, order_id: str):
        self.id = order_id
        self.customer_id = None
        self.items = []
        self.status = "PENDING"
        self.total_amount = 0
        self.version = 0
        self.uncommitted_events = []
    
    def create_order(self, customer_id: str, items: List[Dict[str, Any]]):
        if self.status != "PENDING":
            raise ValueError("Order already created")
        
        self.customer_id = customer_id
        self.items = items
        self.total_amount = sum(item['price'] * item['quantity'] for item in items)
        self.status = "CREATED"
        
        event = DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type="OrderCreated",
            data={
                "customer_id": customer_id,
                "items": items,
                "total_amount": self.total_amount
            },
            timestamp=datetime.now(),
            version=self.version + 1
        )
        self.uncommitted_events.append(event)
        self.version += 1
    
    def get_uncommitted_events(self):
        return self.uncommitted_events
    
    def mark_events_as_committed(self):
        self.uncommitted_events = []

# 명령 핸들러
class OrderCommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.orders: Dict[str, Order] = {}
    
    async def handle_create_order(self, command: CreateOrderCommand):
        order_id = str(uuid.uuid4())
        order = Order(order_id)
        order.create_order(command.customer_id, command.items)
        
        # 이벤트 저장
        for event in order.get_uncommitted_events():
            await self.event_store.save_event(event)
        
        order.mark_events_as_committed()
        self.orders[order_id] = order
        
        return order_id

# 읽기 모델
@dataclass
class OrderView:
    id: str
    customer_id: str
    items: List[Dict[str, Any]]
    total_amount: float
    status: str
    created_at: datetime

@dataclass
class ProductView:
    id: str
    name: str
    price: float
    category: str
    inventory: int
    description: str

# 조회 핸들러
class OrderQueryHandler:
    def __init__(self):
        self.order_views: List[OrderView] = []
    
    async def handle_get_order_history(self, query: GetOrderHistoryQuery) -> List[OrderView]:
        return [order for order in self.order_views 
                if order.customer_id == query.customer_id]
    
    async def update_order_view(self, event: DomainEvent):
        if event.event_type == "OrderCreated":
            order_view = OrderView(
                id=event.aggregate_id,
                customer_id=event.data["customer_id"],
                items=event.data["items"],
                total_amount=event.data["total_amount"],
                status="CREATED",
                created_at=event.timestamp
            )
            self.order_views.append(order_view)

class ProductQueryHandler:
    def __init__(self):
        self.product_views: List[ProductView] = []
        # 초기 상품 데이터
        self.product_views = [
            ProductView("1", "노트북", 1500000, "전자제품", 10, "고성능 노트북"),
            ProductView("2", "마우스", 50000, "전자제품", 50, "무선 마우스")
        ]
    
    async def handle_search_products(self, query: SearchProductsQuery) -> List[ProductView]:
        results = self.product_views
        
        if query.keyword:
            results = [p for p in results if query.keyword in p.name]
        
        if query.category:
            results = [p for p in results if p.category == query.category]
        
        return results
    
    async def update_inventory(self, event: DomainEvent):
        if event.event_type == "OrderCreated":
            for item in event.data["items"]:
                product_id = item["product_id"]
                quantity = item["quantity"]
                
                for product in self.product_views:
                    if product.id == product_id:
                        product.inventory -= quantity

# CQRS 시스템 조합
class CQRSSystem:
    def __init__(self):
        self.event_store = EventStore()
        self.order_command_handler = OrderCommandHandler(self.event_store)
        self.order_query_handler = OrderQueryHandler()
        self.product_query_handler = ProductQueryHandler()
        
        # 이벤트 구독 설정
        self.event_store.subscribe(self.order_query_handler.update_order_view)
        self.event_store.subscribe(self.product_query_handler.update_inventory)
    
    async def execute_command(self, command):
        if isinstance(command, CreateOrderCommand):
            return await self.order_command_handler.handle_create_order(command)
    
    async def execute_query(self, query):
        if isinstance(query, GetOrderHistoryQuery):
            return await self.order_query_handler.handle_get_order_history(query)
        elif isinstance(query, SearchProductsQuery):
            return await self.product_query_handler.handle_search_products(query)

# 사용 예시
async def main():
    cqrs_system = CQRSSystem()
    
    # 명령 실행: 주문 생성
    create_order_cmd = CreateOrderCommand(
        customer_id="customer_123",
        items=[
            {"product_id": "1", "name": "노트북", "price": 1500000, "quantity": 1},
            {"product_id": "2", "name": "마우스", "price": 50000, "quantity": 2}
        ]
    )
    
    order_id = await cqrs_system.execute_command(create_order_cmd)
    print(f"주문 생성됨: {order_id}")
    
    # 조회 실행: 주문 이력 조회
    order_history_query = GetOrderHistoryQuery(customer_id="customer_123")
    order_history = await cqrs_system.execute_query(order_history_query)
    
    print(f"주문 이력: {len(order_history)}건")
    for order in order_history:
        print(f"  - 주문 ID: {order.id}, 총액: {order.total_amount}")
    
    # 조회 실행: 상품 검색
    product_search_query = SearchProductsQuery(keyword="노트북")
    products = await cqrs_system.execute_query(product_search_query)
    
    print(f"상품 검색 결과: {len(products)}건")
    for product in products:
        print(f"  - {product.name}: {product.inventory}개 남음")

if __name__ == "__main__":
    asyncio.run(main())

운영 및 최적화 (Operations & Optimization)

보안 및 거버넌스

CQRS 보안은 **" 두 개의 길 + 울타리 “**로 보면 쉽다.

영역핵심 위험보안 요구/정책구현 방안모니터링/감사
인증/인가 (API)권한 상승, 토큰 오용Command/Query 권한 분리, 최소권한 (RBAC/ABAC)OAuth 2.0 BCP(권장 플로우, PKCE/PAR), JWT 서명·클레임 검증(exp/iss/aud 등)접근 로그, 토큰 실패율/이상 패턴 탐지
통신 보안도청·중간자 공격외부 TLS 1.3, 내부 mTLS 강제L7 게이트웨이 TLS1.3, 서비스 메시 인증정책 (mTLS)인증서 만료/회전, 암호 스위트 준수
이벤트/브로커위·변조, 무단 구독브로커 인증·인가, 토픽 분리Kafka SASL_SSL + ACL, 프로듀서/컨슈머 자격 분리토픽 접근 실패·비정상 소비량 경보
동기화/일관성이중쓰기, 유실Outbox+CDC, 멱등 소비트랜잭셔널 아웃박스 + Debezium, 재처리·DLQ오프셋 지연·누락 이벤트 대시보드
데이터 보호PII 누출전송·저장 암호화, 키관리·회전필드 레벨 암호화, 키 볼트/KMS, 로테이션키 만료·교체 감사, 민감필드 접근 추적
스키마 거버넌스소비자 장애호환성 모드(Backward 등) 규정Confluent Schema Registry + CI 검증스키마 변경 이력·호환성 실패 알림
규정 준수책임소재 불분명최소권한 (AC-6), 감사 (AU-2) 매핑역할·권한 증적, 이벤트/관리행위 로그 보존정기 리뷰·침해 대응 (72h 규정 등 도메인별)
제품·UX최종 일관성 혼동지연 알림·리프레시 UX" 처리 중/곧 반영 " 피드백, 수동 새로고침읽기 - 쓰기 지연 (SLA) 관측

핵심은 경계 분리와 표준화다. 쓰기·읽기·브로커 각각에 맞춤 보호대(권한·TLS/mTLS·ACL) 를 치고, Outbox+CDC스키마 호환성으로 데이터 흐름을 안정화한다. 마지막으로 최소권한·감사를 규정과 매핑해 신뢰를 완성한다.

모니터링 및 관측성

CQRS 에서는 쓰기와 읽기가 따로 움직여 읽기가 조금 늦게 반영될 수 있다.
그래서 두 가지 축을 꼭 본다:

  1. 이벤트 파이프라인이 잘 흐르는지 (컨슈머 랙·실패율·DLQ),
  2. 읽기 모델이 제때 갱신되는지(프로젝션 랙).
    그리고 OpenTelemetry 로 요청→이벤트→프로젝션→조회를 하나의 트레이스로 묶으면, 어디서 느려졌는지 바로 찾을 수 있다.
영역핵심 지표의미/목적수집원/도구경보 아이디어
이벤트 파이프라인Consumer Lag소비 지연 (오프셋 기준) 로 처리 적체 파악Kafka JMX/Exporter, Confluent/MSK 모니터링랙의 절대값·증가율 경보, 파티션별 임계치.
처리율/실패율초당 이벤트 처리·실패 추세Broker/Consumer metrics실패율 급증·처리율 급감 시 경보
DLQ 크기/증가율재처리 필요 이벤트 적체브로커/파이프라인 메트릭DLQ 증가율·체류 시간 경보
파티션 스큐일부 파티션 과부하Exporter·대시보드스큐 비율 초과 시 재파티셔닝
저장소·동기화프로젝션 랙 (seconds)이벤트→읽기모델 반영 지연 (EC 가시화)프로젝션 워커 메트릭/로그랙 P95/P99 경보 (업무 SLA 기준).
Write/Read DB 지연·에러율각각의 병목/장애 조기 탐지DB/드라이버 메트릭지연·에러율 임계치
추적/로깅엔드투엔드 트레이스Command→Event→Projection→Query 연결OpenTelemetry, W3C Trace Context스팬 지연 초과·오류 스팬 비율 경보.
신뢰성 보강Outbox 배출 지연아웃박스→브로커 전파 지연CDC/퍼블리셔 메트릭배출 지연·미전파 건수 경보.
스키마 검증 실패율이벤트 스키마 불일치 탐지레지스트리/소비자 로깅실패율 급증 경보

관측의 축은 이벤트 흐름 (랙·실패·DLQ)읽기 반영 (프로젝션 랙), 그리고 엔드투엔드 추적이다. 이 세 축만 꾸준히 보면 CQRS 의 지연 일관성병목을 체계적으로 다 잡을 수 있다. 지표는 Kafka/Exporter·DB 메트릭·OpenTelemetry 로 표준화해 대시보드와 경보에 연결하라.

실무 적용 고려사항 및 주의점

카테고리고려사항위험/주의권장사항핵심 지표
설계/도입Bounded Context 기준 점진 도입전면 전환 실패 리스크파일럿→확장, 간단 도메인은 CRUD 유지전환 범위 대비 결함률
설계/도입도메인 복잡성 평가과도한 아키텍처읽기≫쓰기·질의 다양에서만 적용R:W, 질의 수/복잡도
일관성/동기화최종 일관성으로 인한 UXRYW 미보장낙관적 UI/세션 일관성, 중요 화면 동기화뷰 갱신 지연 p95
일관성/동기화Projection/뷰 설계중복·정합성 관리이벤트 기반 뷰 갱신/리플레이프로젝션 지연/실패율
신뢰성/내결함성듀얼라이트 (유실/중복)DB↔브로커 불일치Transactional Outbox + 멱등미전송/중복 이벤트 수
신뢰성/내결함성분산 트랜잭션전역 롤백 부재Saga(보상/오케스트레이션)보상률/타임아웃
이벤트/순서/버전순서 보장파티션 확장 시 순서 붕괴파티션 키로 엔티티 고정, 단일 파티션 선택적out-of-order 비율
이벤트/순서/버전이벤트/스키마 버전리플레이·호환성 오류이벤트/스키마 버전 전략·백필 플랜버전 충돌 건수
인프라/운영/관측메시징/모니터링 인프라원인 미상 지연Lag, 처리율, 오류율, DLQLag, DLQ 적재량
팀/테스트/거버넌스학습 곡선/테스트 복잡생산성 초기 저하교육·런북·계약/통합 테스트실패 시 재처리 성공률
보안/감사R/W API 분리권한·감사 누락최소권한, 감사로그, 추적 ID감사 이벤트 결손률
운영 자동화Outbox/CDC 운영릴레이·청소 비용Debezium Outbox Router/Quarkus릴레이 지연/적체

성능 최적화 전략 및 고려사항

핵심은 읽기와 쓰기의 길을 분리하고, 그 사이를 이벤트로 느슨하게 연결하면서 성능·확장·정합성을 균형 있게 잡는 것.

카테고리전략/고려사항정의·원리구성 요소목적사용 상황특징/주의
읽기 최적화캐시/프로젝션/머뷰쓰기와 분리된 읽기 전용 뷰로 핫패스 최소화Redis/Elastic/읽기 DB, Projection지연↓·처리량↑조회 비율↑ 서비스캐시 무효화 정책 필수.
쓰기·확장비동기/배치/파티셔닝개별 트랜잭션 비용↓, 병렬 처리↑배치 프로듀서, 파티션 키처리량↑·지연 균형고트래픽 쓰기배치 크기·재시도·백프레셔 튜닝
이벤트·브로커·순서키=파티션 고정, 멱등성, DLQ파티션 내 순서 보장·중복 허용 처리Kafka/Rabbit, Idempotent Consumer순서·신뢰성엔티티별 이벤트 흐름글로벌 순서 불가, 키 설계가 전부.
정합성·동기화Outbox+CDC“DB 커밋=이벤트 기록 " 원자화→CDC 전달Outbox 테이블, Debezium SMT, Broker이중쓰기 제거분산 동기화약간의 지연, 표준화 쉬움.
스냅샷주기 스냅샷/압축이벤트 재생 비용 절감스냅샷 스토어리플레이 비용↓ES 결합용량/주기 설계 필요.
스키마 진화Backward-compat소비자/프로듀서 호환성 유지스키마 레지스트리/버전 핸들러배포 리스크↓장수 서비스Producer-First 금지, Consumer-First 권장.
운영·관측성분산 추적/지연 모니터링명령→이벤트→프로젝션→조회 체인 추적OTel/APM/로그이상 탐지·SLA다팀 운영상관 ID/샘플링·알람 룰 필수
보안·거버넌스RBAC/암호화/격리원천·전송·저장 보안IAM/KMS/VPC규제 충족멀티테넌시·규제 도메인성능·비용 트레이드오프
비용·용량TTL/아카이빙/압축오래된 데이터 비용↓DLQ/아카이브/Topic 관리비용 최적화대용량 로그회수·보존 정책 명문화

작게 시작해 (단일 DB+ 프로젝션) 병목을 찾고, 필요 시 저장소 분리·배치·파티셔닝으로 확장한다.
정합성은 Outbox+CDC 로, 순서는 파티션 키로 지킨다.
스키마는 뒤호환을 우선하며, 관측성과 비용·보안을 동일한 1 급 요구사항으로 다룬다.

고급 주제 (Advanced Topics)

도전 과제

핵심은 명령 (쓰기) 모델조회 (읽기) 모델의 분리로 인해 발생하는 추가적인 복잡성을 어떻게 다룰 것인가이다.

가장 큰 기술적 문제는 ’ 데이터 일관성 ‘ 이다.
쓰기 모델의 데이터가 읽기 모델로 즉시 동기화되지 않고, 비동기 이벤트에 의존하기 때문에 일시적인 데이터 불일치가 발생할 수 있다. 이는 사용자가 최신 정보를 보지 못하는 결과를 초래할 수 있다.
또한, 아키텍처가 복잡해지면서 개발 및 운영 부담이 커진다. 두 개의 모델을 관리해야 하고, 그 사이의 비동기 이벤트를 추적하고 모니터링해야 한다. 이 과정에서 데이터 중복이나 이벤트 스키마 관리에 대한 문제도 발생한다.

이러한 기술적 문제들은 결국 팀 구조협업 방식에 영향을 준다. 명령 모델과 조회 모델을 담당하는 팀을 분리하거나, 기존의 CRUD(Create, Read, Update, Delete) 기반 시스템에서 CQRS 로 전환하는 과정에서 기술 부채마이그레이션이라는 난관에 부딪히게 된다. CQRS 도입은 단순히 기술 스택을 바꾸는 것이 아니라, 조직의 운영 방식과도 깊은 관련이 있는 중대한 변화이다.

카테고리도전 과제원인영향해결 방안
기술적 복잡성 및 일관성 관리최종 일관성 (Eventual Consistency)명령/조회 모델의 분리 및 비동기 이벤트 통신사용자 경험 저하, 일시적 데이터 불일치사용자 인터페이스 (UI) 에서 지연을 명시하거나, 동기 업데이트 옵션 설계, 사용자에게 피드백 제공
분산 트랜잭션 관리여러 데이터베이스 간 ACID 트랜잭션 보장 어려움데이터 무결성 위험, 복잡한 비즈니스 로직 처리 어려움Saga 패턴, 2PC(Two-Phase Commit), 트랜잭션 아웃박스 (Transactional Outbox) 패턴
아키텍처 복잡성읽기/쓰기 모델의 분리 및 추가 시스템 (메시징 시스템 등) 도입개발, 운영, 테스트 난이도 증가, 팀 온보딩 비용 상승마이크로서비스 경계 명확화, 점진적 도입 (Incremental adoption), 자동화된 테스트 및 배포 파이프라인 구축
데이터 및 이벤트 관리데이터 중복 및 유지보수읽기 모델의 비정규화된 데이터 저장스토리지 비용 증가, 동기화 관리 비용 상승스냅샷 (Snapshot), 변경 데이터 캡처 (CDC, Change Data Capture) 활용, 효율적인 동기화 전략 수립
이벤트 순서, 중복, 유실분산 시스템의 비결정성, 네트워크 지연, 재시도 로직데이터 불일치, 비즈니스 로직 오류메시지 브로커 파티셔닝 (Partitioning), 멱등성 (Idempotency) 프로젝션, 트랜잭션 아웃박스 (Transactional Outbox) 패턴
이벤트 스키마 진화비즈니스 요구사항 변경에 따른 이벤트 구조 변경기존 프로젝션 (Projection) 과의 호환성 문제, 기술 부채스키마 레지스트리 (Schema Registry) 도입, 이벤트 버전 관리 및 호환성 유지 전략 수립
조직 및 운영 부담운영 및 모니터링 복잡성분산된 컴포넌트들 (DB, 메시징, 서비스)장애 진단 및 추적 어려움, 운영 비용 증가통합 로깅, 분산 추적 (Distributed Tracing), 모니터링 대시보드 (Grafana, Kibana 등) 구축
팀 구조 및 협업명령/조회 책임 분리에 따른 팀 역할 재정의개발 속도 저하, 의존성 관리 문제Conway’s Law(콘웨이의 법칙) 를 고려한 팀 구조 재편, API(Application Programming Interface) 우선 설계, 계약 테스트 (Contract Testing) 도입
레거시 시스템 통합기존 CRUD(Create, Read, Update, Delete) 기반 시스템과의 호환성마이그레이션 복잡성, 기술 부채점진적 마이그레이션, 스트랭글러 피그 (Strangler Fig) 패턴 적용

CQRS 도입의 도전 과제는 크게 세 가지 영역으로 나뉜다.

  1. 기술적 복잡성 및 일관성 관리이다.
    분리된 모델과 비동기 통신 때문에 발생하는 최종 일관성 문제는 사용자 경험에 직접적인 영향을 준다. 이를 해결하기 위해 Saga 패턴 같은 분산 트랜잭션 관리 기법을 사용하거나, 아키텍처의 복잡성을 관리하는 전략이 필요하다.

  2. 데이터 및 이벤트 관리이다.
    읽기 모델의 데이터 중복과 이벤트의 순서, 중복, 유실 문제는 시스템의 신뢰성을 떨어뜨릴 수 있다. 멱등성을 보장하는 프로젝션 (Projection) 이나 트랜잭션 아웃박스 (Transactional Outbox) 패턴을 통해 이를 방지해야 한다. 또한, 비즈니스 변경에 따라 발생하는 이벤트 스키마 진화 문제도 중요한 관리 포인트이다.

  3. 조직 및 운영 부담이다.
    CQRS 는 단순히 기술적인 선택이 아니라, 팀 구조와 운영 방식을 재정립하는 일이다. 복잡해진 시스템을 효율적으로 모니터링하고, 기존 레거시 시스템과의 통합을 위한 명확한 **마이그레이션 전략 (예: Strangler Fig 패턴)**이 필요하다. 이러한 도전 과제들을 극복하려면 기술적 솔루션과 함께 조직의 변화를 유도하는 노력이 병행되어야 한다.

생태계 및 관련 기술

CQRS(Command Query Responsibility Segregation) 는 단독으로 사용되기보다는 여러 기술과 결합하여 그 진가를 발휘하는 패턴이다.
이 아키텍처의 핵심은 비동기 통신이다.
명령 (Command) 이 처리된 후, 그 결과를 이벤트 브로커Kafka(카프카)RabbitMQ(래빗엠큐) 를 통해 읽기 모델로 전달하여 데이터를 업데이트한다. 이 과정에서 이벤트 소싱 (Event Sourcing) 과 같은 패턴을 함께 사용하면 모든 상태 변화 이력을 보존할 수 있어 시스템의 신뢰성을 높일 수 있다.

또한, CQRS 에서는 읽기 모델의 성능을 극대화하기 위해 목적에 맞는 다양한 데이터 저장소를 활용한다.
예를 들어, 복잡한 검색 기능은 **Elasticsearch(엘라스틱서치)**를 사용하고, 빠른 응답이 필요한 조회는 **Redis(레디스)**와 같은 인메모리 캐시를 활용하는 방식이다. 이러한 분산된 시스템을 효과적으로 관리하기 위해 **OpenTelemetry(오픈텔레메트리)**와 같은 관측성 (Observability) 도구는 필수적이며, **CloudEvents(클라우드이벤트)**와 같은 표준은 시스템 간의 원활한 통신을 보장한다.

카테고리기술/표준역할
메시징 및 이벤트 처리Apache Kafka/RabbitMQ명령 처리 결과를 읽기 모델에 비동기적으로 전달하는 이벤트 브로커
Event Sourcing모든 상태 변화를 이벤트로 저장하여 감사 (Audit) 및 과거 상태 재구성 가능
CloudEvents이벤트에 대한 메타데이터 (Metadata) 를 표준화하여 상호 운용성 보장
데이터 저장소 및 관리PostgreSQL/MongoDB쓰기 모델의 원본 데이터 저장소로 사용 (이벤트 스토어 역할 가능)
Elasticsearch/OpenSearch복잡한 검색 및 집계 쿼리를 위한 읽기 모델 저장소
Redis빠른 응답이 필요한 읽기 모델을 위한 인메모리 캐시
Debezium CDC/Transactional Outbox이중 쓰기 (Dual Write) 문제를 해결하고 DB 변경 내용을 이벤트로 캡처
관측성 및 표준OpenTelemetry(OTel)분산 시스템 전반의 로그 (Log), 메트릭 (Metric), 추적 (Trace) 을 통합 관리하는 관측성 표준
JSON Schema/Avro이벤트 데이터의 구조와 스키마 (Schema) 를 정의하여 버전 호환성을 관리
구현 및 프레임워크Axon FrameworkCQRS 와 이벤트 소싱 구현을 돕는 전용 프레임워크
Azure/AWS Patterns클라우드 환경에서 CQRS 를 설계하고 구현하는 가이드라인 제공

CQRS 는 단일 기술이 아닌 여러 기술의 조합으로 완성되는 아키텍처 패턴이다.
가장 핵심적인 요소는 메시징 시스템으로, Kafka(카프카) 와 같은 이벤트 브로커가 비동기 통신의 중추 역할을 한다. 쓰기 모델의 데이터는 일반적으로 관계형 또는 NoSQL(NoSQL) 데이터베이스에 저장되고, 읽기 모델은 사용자 조회에 최적화된 Elasticsearch(엘라스틱서치) 와 같은 검색 엔진이나 Redis(레디스) 와 같은 캐시를 활용하여 성능을 높인다. 이러한 분산 환경을 효율적으로 운영하기 위해서는 OpenTelemetry(오픈텔레메트리) 같은 표준을 통해 시스템의 상태를 **관측 (Observability)**하고, 스키마 관리 도구를 사용해 데이터 호환성을 유지하는 것이 중요하다.

최신 기술 트렌드와 미래 방향

CQRS(Command Query Responsibility Segregation) 는 이제 단순한 아키텍처 패턴을 넘어, 최신 기술 트렌드와 결합하며 진화하고 있다. 가장 두드러진 변화는 클라우드 네이티브 환경으로의 전환이다. 개발자들은 더 이상 서버를 직접 관리하지 않는 서버리스 (Serverless) 아키텍처를 선호하며, 이를 통해 운영 부담을 줄이고 개발에 집중한다. 이와 함께, AI(인공지능)/ML(머신러닝) 기술이 도입되어 쿼리 패턴을 분석하고 읽기 모델을 자동으로 최적화하는 등 시스템이 스스로 진화하는 방향으로 나아가고 있다.

또한, 사용자에게 더 빠른 서비스를 제공하기 위해 **에지 컴퓨팅 (Edge Computing)**이 중요한 트렌드로 부상하고 있다. CDN(Content Delivery Network) 에지에 읽기 모델을 캐싱 (Caching) 하여 데이터 접근 지연 시간을 줄이는 방식이 대표적이다. 마지막으로, 복잡해진 CQRS 시스템을 효율적으로 운영하기 위해 **DevOps(데브옵스)**와 **GitOps(깃옵스)**를 기반으로 한 자동화된 배포 및 모니터링이 필수가 되고 있다.

카테고리기술 트렌드핵심 내용
클라우드/서버리스서버리스 CQRSAWS Lambda, DynamoDB Streams 등을 활용하여 서버 관리 없이 CQRS 구현. 운영 비용 및 관리 부담 절감.
클라우드 네이티브Kubernetes, Istio 와 같은 기술로 CQRS 컴포넌트의 자동 확장, 트래픽 관리, 유연성 확보.
지능형 자동화AI/ML 기반 최적화ML 모델이 쿼리 패턴을 분석하여 읽기 모델의 인덱스 구조, 파티셔닝 등을 자동으로 최적화.
스트림 프로세싱Apache Flink, Apache Spark Streaming 등을 통해 대용량 이벤트를 고성능으로 실시간 처리.
분산 및 에지 컴퓨팅에지 컴퓨팅 통합CDN 에지에 읽기 모델을 캐싱하거나, 지역별로 명령 처리를 분산하여 사용자에게 더 가까운 서비스 제공.
마이크로서비스/데이터 메시각 도메인 (Domain) 을 독립적인 데이터 제품 (Data Product) 으로 관리하여 데이터 거버넌스 (Governance) 및 통합 용이성 확보.
개발/운영 혁신DevOps 및 GitOps코드형 인프라 (Infrastructure as Code) 를 사용하여 배포를 자동화하고, Git 을 통해 시스템 상태를 관리.
관측성 강화Prometheus, Grafana, Jaeger 등을 통합하여 분산된 CQRS 아키텍처의 상태를 실시간으로 모니터링하고 장애를 진단.

CQRS 의 최신 트렌드는 클라우드 네이티브로의 전환과 지능형 자동화에 초점을 맞추고 있다. 서버리스 아키텍처를 통해 인프라 관리 부담을 줄이고, Kubernetes(쿠버네티스) 와 같은 기술로 시스템을 유연하게 확장한다. 또한, AI/ML을 활용하여 읽기 모델을 자동으로 최적화하고, 스트림 프로세싱 기술로 대용량 데이터를 실시간으로 처리하는 추세가 강화되고 있다. 사용자 경험을 개선하기 위한 에지 컴퓨팅의 도입도 활발하며, 이러한 복잡성을 효율적으로 관리하기 위해 **DevOps(데브옵스)**와 관측성은 필수적인 요소로 자리 잡았다.


최종 정리 및 학습 가이드

내용 정리

도로 두 개” 를 깐다 생각하면 쉽다.
한 도로 (쓰기) 는 규칙·검증이 촘촘하고, 다른 도로 (읽기) 는 고속도로처럼 빠르다.
쓰기에서 생긴 변화는 이벤트로 읽기 쪽 표지판 (프로젝션) 을 업데이트한다. 그래서 읽기를 크게 늘려도 쓰기 성능이 무너지지 않는다. 대신 표지판 갱신이 살짝 늦을 수 있음 (최종 일관성) 을 제품·UX 로 보완해야 한다.

  1. 무엇/왜

    • 정의: 읽기/쓰기 모델·파이프라인 분리.
    • 장점: 조회 가속, 쓰기 안정화, 독립 스케일, 폴리글랏, 보안 경계.
    • 주의: 복잡성·일관성 관리.
  2. 어떻게

    • 읽기: 프로젝션/머티리얼라이즈드 뷰, 캐시·인덱스 최적화.
    • 쓰기: 트랜잭션·도메인 규칙·권한 집중.
    • 동기화: 이벤트·메시지 브로커, Outbox/CDC.
  3. 언제

    • 읽기 편중, 도메인 규칙 복잡, 팀 분리·배포 독립이 필요, 데이터 저장소 이질성 수용 필요 시 적합.
    • 단순 CRUD 엔 과설계가 될 수 있음.
  4. 사례

    • Microsoft eShopOnContainers: 단순화된 CQRS 레퍼런스.
    • Netflix Tudum: CQRS 구현을 RAW/Hollow 방향으로 최적화.
  5. 함께 볼 패턴

    • ES(감사·재생성), EDA, Sagas/Outbox, Database-per-service.

학습 로드맵

Phase기간목표주요 학습 내용산출물/성과
1. 기초 이해1~2 주개념·배경 이해CRUD 한계, CQS vs CQRS개념 노트, 비교표
2. 이론 학습1~2 주설계 원리 습득구성 요소, 데이터 흐름, 일관성 모델아키텍처 다이어그램
3. 분석1 주적용 판단장단점, 트레이드오프, 사례 분석적용 가이드
4. 구현 실습2~4 주CQRS 구축단일 앱 구현, Outbox+CDC, 파티션 설계동작 예제 코드
5. 운영 최적화2 주안정적 운영OTel 관측성, 지연 모니터링, 장애 복구모니터링 대시보드
6. 고급 주제지속확장·최적화Event Sourcing, Saga, Serverless고급 설계 문서

학습 항목 매트릭스

카테고리Phase항목중요도설명
기초1CQS 원칙필수CQRS 의 이론적 기반 (명령/조회 분리).
기초1CQRS 핵심 개념필수읽기/쓰기 모델·인터페이스를 분리해 독립 최적화.
기초1최종 일관성필수비동기 전파로 읽기 최신성이 지연될 수 있음을 이해.
기초1CAP 맥락 이해권장가용성/일관성·분할 내성 트레이드오프 감각 잡기.
이론2도메인 모델링 (애그리게이트)필수쓰기 모델의 트랜잭션 경계·불변식 설계.
이론2프로젝션/뷰 설계필수조회 특화 머티리얼라이즈드 뷰 설계 원칙.
이론2도메인 이벤트권장상태 변화 사실을 이벤트로 표준화 (의미·스키마).
구현4기본 CQRS 구현필수Command/Query 핸들러·Read/Write 모델 분리.
구현4메시지 브로커 도입권장비동기 이벤트 전달 (Kafka/RabbitMQ 등).
구현4Transactional Outbox필수급 권장DB 쓰기 + 이벤트 발행 원자성 보장.
구현5프로젝션 파이프라인권장이벤트→읽기 모델 갱신 (실시간/배치).
구현5폴리글랏 퍼시스턴스권장읽기/쓰기 저장소를 목적별로 분리·튜닝.
구현5MSA 통합 (선택)선택서비스 경계 분리·API 게이트웨이·스키마 경계.
운영6모니터링/관측권장읽기 p95·뷰 갱신 지연·Outbox 미전송·중복률.

용어 정리

카테고리용어정의관련/주의
핵심 패턴·원칙CQRS읽기 (Query) 와 쓰기 (Command) 모델·경로를 분리하는 설계 패턴. 성능·확장성 최적화가 목적대부분 시스템엔 과한 복잡성일 수 있음. 선별 적용 권장.
CQS쿼리는 부작용 없고, 명령은 결과를 반환하지 않는 메서드 수준 규율CQRS 의 사상적 기반.
Event Sourcing모든 상태 변경을 이벤트로 영속화하고 이벤트 재생으로 상태 복원CQRS 와 자주 결합되나 필수 아님.
도메인/경계AggregateDDD 의 일관성 경계. 명령 처리와 상태 변경의 단위CQRS 쓰기 모델의 핵심 단위
Bounded Context모델/언어의 경계를 명시하는 DDD 전략 개념마이크로서비스 경계 설정에 활용.
구성요소/역할Command시스템 상태를 변경하는 요청/메시지Command Handler 와 1:1 관계
Query상태를 조회하는 요청 (부작용 없음)Read Model 만 접근
Command Handler명령 검증·도메인 규칙 실행·쓰기 저장수행Outbox 와 함께 이벤트 발행 연결
Query Handler조회 요청을 처리하고 읽기 모델에서 응답Read DB 최적화와 결합
Projection이벤트를 소비해 읽기 모델 (물질화 뷰) 을 갱신하는 프로세스Denormalizer 라고도 부름
Read Model / Write Model조회 전용 모델 / 상태 변경 전용 모델분리 저장소 권장 (성능·독립 스케일)
Event Store이벤트 소싱용 이벤트 영속 저장소스냅샷·리플레이 지원
데이터·저장소Materialized View읽기 성능을 위해 비정규화된 실시간/준실시간 뷰Projection 결과물
Snapshot특정 시점 Aggregate 상태 스냅샷긴 리플레이 시간 단축
Polyglot Persistence워크로드별 최적화된 DB 를 혼합 사용Read/Write 저장소 이기종화
메시징/동기화Event Broker이벤트 발행/구독 인프라 (Kafka, RabbitMQ 등)전달 보장·순서·중복 고려
Delivery Semanticsat-most/at-least/ exactly-once 전달 의미EOS 는 범위·전제조건이 있음.
Consumer Lag브로커 오프셋 대비 소비 지연읽기 모델 랙 지표로 활용
신뢰성·일관성 패턴Transactional OutboxDB 트랜잭션 안에서 아웃박스 레코드를 기록하고 CDC/퍼블리셔로 브로커 전송Dual-write 문제 표준 해법.
Saga(보상 트랜잭션)로컬 트랜잭션들의 연쇄와 보상 처리로 분산 일관성 확보오케스트레이션/코레오그래피.
Idempotence같은 메시지를 여러 번 처리해도 결과가 동일중복·재시도 안전 처리의 기본

참고 및 출처