Cloud-Native Architecture#
Cloud-Native Architecture 는 클라우드 컴퓨팅의 분산형, 동적, 탄력적 특성을 활용하여 현대적인 애플리케이션을 설계하고 운영하는 포괄적인 방법론이다. 전통적인 모놀리식 아키텍처와 달리 마이크로서비스로 분해된 애플리케이션을 컨테이너로 패키징하고, Kubernetes 와 같은 오케스트레이션 플랫폼에서 실행한다. 자동화, 관찰가능성, 무결성 인프라스트럭처를 통해 고가용성과 확장성을 보장하며, DevOps 문화와 긴밀히 연계되어 지속적인 통합 및 배포를 실현한다.
Cloud-Native Architecture 의 등장 배경은 전통적인 IT 인프라스트럭처와 애플리케이션 개발 방식의 한계에서 시작된다.
전통적 아키텍처의 한계#
모놀리식 애플리케이션의 문제점
- 대규모 단일 코드베이스로 인한 개발 및 유지보수의 복잡성
- 부분적 확장의 어려움 (전체 애플리케이션을 스케일링해야 함)
- 단일 장애점으로 인한 전체 시스템 다운 위험
- 기술 스택 변경의 어려움
온프레미스 인프라스트럭처의 제약
- 고정된 하드웨어 리소스로 인한 비효율성
- 수동적인 인프라 관리로 인한 높은 운영 비용
- 확장성과 탄력성의 부족
- 장시간의 배포 및 복구 프로세스
클라우드 컴퓨팅의 등장#
클라우드의 혁신적 특성
- 온디맨드 리소스 프로비저닝
- 사용량 기반 과금 모델
- 글로벌 분산 인프라스트럭처
- 관리형 서비스의 풍부한 생태계
대규모 인터넷 기업들의 선도적 사례
- Google 의 Borg 시스템 (Kubernetes 의 전신)
- Netflix 의 Hystrix 와 마이크로서비스 아키텍처
- Twitter 의 Finagle 라이브러리
- Amazon 의 SOA (Service-Oriented Architecture) 전환
기술적 진화의 촉진 요인#
컨테이너화 기술의 성숙
- Docker 의 등장으로 애플리케이션 패키징 혁신
- 경량화된 가상화 기술의 대중화
- 개발과 운영 환경의 일관성 확보
오픈소스 생태계의 발전
- Cloud Native Computing Foundation (CNCF) 의 설립 (2015 년)
- Kubernetes 의 오픈소스화 및 표준화
- 다양한 클라우드 네이티브 도구들의 성숙
목적 및 필요성#
비즈니스 목적#
민첩성 향상
- 빠른 기능 개발 및 배포를 통한 시장 대응력 강화
- 독립적인 팀 운영으로 개발 속도 가속화
- 실험과 혁신을 위한 안전한 환경 제공
운영 효율성 증대
- 자동화를 통한 운영 비용 절감
- 리소스 사용량 최적화로 인한 비용 효율성
- 장애 복구 시간 단축
확장성 확보
- 비즈니스 성장에 따른 유연한 시스템 확장
- 글로벌 서비스 제공을 위한 지리적 분산
- 피크 트래픽 대응 능력 향상
기술적 필요성#
복잡성 관리
- 대규모 시스템을 관리 가능한 단위로 분해
- 각 서비스별 독립적인 기술 스택 선택 가능
- 팀별 책임 영역 명확화
안정성 및 복원력
- 분산 시스템의 장애 격리
- 자동 복구 및 자가 치유 능력
- 무중단 배포 및 롤백 기능
보안 강화
- 제로 트러스트 보안 모델 구현
- 마이크로세그멘테이션을 통한 공격 표면 최소화
- 컴플라이언스 요구사항 충족
현대적 개발 문화 지원#
DevOps 문화 활성화
- 개발과 운영의 협업 강화
- 지속적 통합 및 배포 (CI/CD) 문화 정착
- 피드백 기반의 지속적 개선
개발자 경험 향상
- 개발 환경과 운영 환경의 일관성
- 인프라 관리 부담 경감
- 비즈니스 로직에 집중할 수 있는 환경
핵심 개념#
Cloud-Native Architecture는 클라우드 환경에서 애플리케이션을 설계, 개발, 배포하는 아키텍처 스타일로, 확장성, 유연성, 자동화, 마이크로서비스, 컨테이너화 등을 특징으로 한다.
기본 핵심 개념#
마이크로서비스 아키텍처 (Microservices Architecture)
- 애플리케이션을 독립적으로 배포 가능한 작은 서비스들로 분해
- 각 서비스는 단일 비즈니스 기능에 집중
- API 를 통한 느슨한 결합 (Loose Coupling) 구현
컨테이너화 (Containerization)
- 애플리케이션과 그 종속성을 하나의 패키지로 묶는 기술
- 운영체제와 하드웨어로부터 독립성 제공
- Docker 와 같은 컨테이너 런타임 활용
컨테이너 오케스트레이션 (Container Orchestration)
- 컨테이너의 배포, 확장, 관리를 자동화
- Kubernetes 가 사실상의 표준
- 서비스 디스커버리, 로드 밸런싱, 자동 복구 기능 제공
무상태 설계 (Stateless Design)
- 서버가 클라이언트의 상태 정보를 저장하지 않음
- 수평 확장과 장애 복구를 용이하게 함
- 세션 데이터는 외부 저장소에 분리 보관
심화 핵심 개념#
서비스 메시 (Service Mesh)
- 마이크로서비스 간 통신을 관리하는 전용 인프라스트럭처 계층
- 사이드카 패턴을 통한 네트워크 프록시 구현
- 보안, 관찰가능성, 트래픽 관리 기능 제공
불변 인프라스트럭처 (Immutable Infrastructure)
- 배포 후 서버나 컨테이너를 변경하지 않는 원칙
- 문제 발생 시 기존 인스턴스를 폐기하고 새로운 인스턴스로 교체
- 보안성과 일관성 향상
선언적 API (Declarative APIs)
- 원하는 상태를 선언하여 시스템이 자동으로 해당 상태로 수렴
- Kubernetes 의 YAML 매니페스트가 대표적 예시
- GitOps 와 Infrastructure as Code (IaC) 활성화
방어 심층 보안 (Defense in Depth)
- 모든 구성 요소 간 인증 및 암호화 적용
- Zero Trust 보안 모델 구현
- 내부와 외부 구분 없이 모든 통신을 검증
실무 구현을 위한 연관 개념#
DevOps 및 CI/CD
- 개발과 운영의 통합을 통한 지속적 배포
- 자동화된 테스트, 빌드, 배포 파이프라인
- 피드백 루프를 통한 빠른 개선 사이클
관찰가능성 (Observability)
- 메트릭, 로그, 분산 추적을 통한 시스템 모니터링
- Prometheus, Grafana, Jaeger 등의 도구 활용
- 시스템 상태의 실시간 가시성 확보
Infrastructure as Code (IaC)
- 인프라스트럭처를 코드로 정의하고 관리
- Terraform, Ansible, Helm 등의 도구 사용
- 버전 관리와 재현 가능한 환경 구축
실무에서 구현하기 위해 필요한 내용#
- 마이크로서비스 설계: 독립적 서비스 단위로 분리 및 경계 설정.
- 컨테이너 및 오케스트레이션: 컨테이너 기반 배포, Kubernetes 등 오케스트레이션 도구 활용.
- CI/CD 파이프라인: 자동화된 빌드, 테스트, 배포.
- 관측성: 모니터링, 로깅, 트레이싱 통합.
- 보안 및 규정 준수: 서비스 간 통신, 데이터 보호, 인증/인가 강화.
주요 기능 및 역할#
핵심 기능:
- 자동 확장 (Auto-scaling): 부하에 따른 자동 리소스 조정
- 장애 복구 (Fault Recovery): 자동 장애 감지 및 복구
- 서비스 디스커버리 (Service Discovery): 동적 서비스 위치 발견
- 로드 밸런싱 (Load Balancing): 트래픽 분산 처리
주요 역할:
- 개발 생산성 향상: 개발자가 비즈니스 로직에 집중 가능
- 운영 안정성 보장: 자동화를 통한 휴먼 에러 감소
- 확장성 제공: 수요 변화에 따른 유연한 리소스 관리
- 비용 효율성: 필요한 만큼만 리소스 사용
기술적 특징:
- 느슨한 결합 (Loosely Coupled): 서비스 간 독립성 보장
- 상태 비저장 (Stateless): 서비스 인스턴스 간 상태 공유 없음
- 탄력적 (Elastic): 수요에 따른 자동 확장/축소
- 복원력 (Resilient): 장애 상황에서도 서비스 지속성 유지
운영적 특징:
- 자동화 중심: 수동 개입 최소화
- 지속적 배포: 빈번하고 안정적인 릴리스
- 관찰가능성: 시스템 상태의 실시간 모니터링
- 보안 내재화: 개발 프로세스에 보안 통합
핵심 원칙#
분류 | 원칙 | 설명 |
---|
설계 원칙 | 클라우드 우선 설계 (Design for Cloud) | 분산·탄력적 특성을 활용하고, 온디맨드 리소스 및 관리형 서비스 기반 설계 |
| 마이크로서비스 분해 (Microservices Decomposition) | 단일 책임 원칙 기반의 도메인 분리, DDD 및 API 우선 설계 적용 |
| 무상태 설계 (Stateless Design) | 상태 외부화 및 세션 분리 저장을 통해 수평 확장이 가능한 구조 구현 |
운영 원칙 | 자동화 우선 (Automation First) | IaC 기반 운영 자동화 및 반복 작업의 스크립팅, 선언형 구성 적용 |
| 방어 심층 보안 (Defense in Depth) | 제로 트러스트 보안, 암호화, 네트워크 세분화, 구성 요소 간 상호 인증 |
| 관찰가능성 내장 (Built-in Observability) | 메트릭, 로그, 트레이싱 표준화 및 실시간 모니터링 내장 설계 |
문화 원칙 | 지속적 진화 (Continuous Evolution) | CI/CD 기반 반복적 개선, 실험과 피드백을 통한 진화적 개발 |
| 팀 자율성 (Team Autonomy) | 독립적인 팀 운영, 기술 스택 선택 자율성, 책임과 권한의 균형 |
작동 원리#
Cloud-Native 아키텍처는 마이크로서비스를 컨테이너로 패키징되고, Kubernetes 등 오케스트레이션 도구를 통해 배포·확장·복구가 자동화된다. CI/CD 파이프라인을 통해 자동 배포 및 테스트가 이루어지며, 모니터링/로깅, 서비스 메시/API 게이트웨이를 통해 서비스 간 통신과 관측성이 관리된다.
graph TD
A[Developer] -->|Code Push| B[Git Repository]
B --> C[CI/CD Pipeline]
C --> D[Container Build]
D --> E[Container Registry]
E --> F[Kubernetes Orchestrator]
F --> G[Service Mesh]
G --> H[Microservices]
H --> I[Data Storage]
J[Load Balancer] --> G
K[Monitoring] --> H
L[Logging] --> H
M[Service Discovery] --> H
style F fill:#e1f5fe
style G fill:#f3e5f5
style H fill:#e8f5e8
- 코드 커밋: 개발자가 변경사항을 버전 관리 시스템에 커밋
- 자동 빌드: CI/CD 파이프라인이 자동으로 컨테이너 이미지 생성
- 배포: 오케스트레이터가 컨테이너를 적절한 노드에 배포
- 서비스 통신: 서비스 메시를 통한 안전한 마이크로서비스 간 통신
- 모니터링: 실시간 시스템 상태 관찰 및 대응
전체 작동 원리#
graph TB
subgraph "클라이언트 계층"
A[웹 브라우저]
B[모바일 앱]
C[외부 API]
end
subgraph "API 게이트웨이 계층"
D[API Gateway]
E[로드 밸런서]
end
subgraph "서비스 메시 계층"
F[사이드카 프록시]
G[서비스 디스커버리]
H[보안 정책]
end
subgraph "마이크로서비스 계층"
I[사용자 서비스]
J[주문 서비스]
K[결제 서비스]
L[알림 서비스]
end
subgraph "데이터 계층"
M[사용자 DB]
N[주문 DB]
O[결제 DB]
P[메시지 큐]
end
subgraph "인프라 계층"
Q[Kubernetes 클러스터]
R[컨테이너 레지스트리]
S[모니터링 시스템]
end
A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
F --> H
G --> I
G --> J
G --> K
G --> L
I --> M
J --> N
K --> O
L --> P
I --> Q
J --> Q
K --> Q
L --> Q
Q --> R
Q --> S
요청 처리 플로우#
요청 수신 및 라우팅
- 클라이언트 요청이 API 게이트웨이로 전달
- 인증, 인가, 레이트 리미팅 적용
- 적절한 마이크로서비스로 라우팅
서비스 메시를 통한 통신
- 사이드카 프록시가 서비스 간 통신 중재
- 로드 밸런싱, 서킷 브레이커, 재시도 로직 적용
- 분산 추적 및 메트릭 수집
마이크로서비스 처리
- 각 서비스가 독립적으로 비즈니스 로직 수행
- 필요시 다른 서비스와의 동기/비동기 통신
- 상태 정보는 외부 데이터 저장소에 유지
응답 반환 및 모니터링
- 처리 결과를 클라이언트로 반환
- 요청 플로우 전체에 대한 추적 정보 수집
- 성능 메트릭 및 오류 로그 기록
자동화 워크플로우#
sequenceDiagram
participant Dev as 개발자
participant Git as Git Repository
participant CI as CI Pipeline
participant CD as CD Pipeline
participant K8s as Kubernetes
participant Mon as Monitoring
Dev->>Git: 코드 커밋
Git->>CI: 웹훅 트리거
CI->>CI: 코드 빌드 및 테스트
CI->>CI: 컨테이너 이미지 생성
CI->>CD: 배포 트리거
CD->>K8s: 새 버전 배포
K8s->>K8s: 롤링 업데이트 수행
K8s->>Mon: 배포 상태 전송
Mon->>Dev: 배포 결과 알림
이 시퀀스 다이어그램은 개발자의 코드 커밋부터 운영 환경 배포까지의 자동화된 워크플로우를 보여줍니다. CI/CD 파이프라인을 통해 코드 변경사항이 자동으로 테스트되고 배포되며, 모니터링 시스템이 전체 프로세스를 추적합니다.
주요 원리#
아키텍처 원리#
서비스 분해 원리
graph TD
A[모놀리식 애플리케이션] --> B[도메인 분석]
B --> C[바운디드 컨텍스트 식별]
C --> D[마이크로서비스 정의]
D --> E[API 설계]
E --> F[독립 배포 가능한 서비스]
G[사용자 관리] --> H[주문 처리]
H --> I[결제 처리]
I --> J[배송 관리]
J --> K[재고 관리]
위 다이어그램은 모놀리식 애플리케이션을 마이크로서비스로 분해하는 과정을 보여준다. 도메인 분석을 통해 바운디드 컨텍스트를 식별하고, 각각을 독립적인 마이크로서비스로 구현한다.
통신 패턴 원리
- 동기 통신: HTTP/REST API, gRPC
- 비동기 통신: 메시지 큐, 이벤트 스트리밍
- 사이드카 패턴: 서비스 메시 구현
데이터 관리 원리#
데이터베이스 분리 원리
- 각 마이크로서비스는 고유한 데이터베이스 보유
- 서비스 간 데이터 공유는 API 를 통해서만 수행
- 이벤트 소싱을 통한 데이터 일관성 관리
CQRS (Command Query Responsibility Segregation) 원리
- 명령 (쓰기) 과 조회 (읽기) 모델의 분리
- 각각에 최적화된 데이터 저장소 사용
- 읽기 성능과 쓰기 성능의 독립적 최적화
구조 및 아키텍처#
Cloud-Native Architecture 는 여러 계층으로 구성된 분산 시스템이다. 각 계층은 특정한 역할과 책임을 가지며, 서로 느슨하게 결합되어 있다.
graph TD
%% Entry Layer
User[사용자] --> Gateway["API Gateway (NGINX / Kong)"]
%% Service Layer
Gateway --> SVC1[User Service]
Gateway --> SVC2[Order Service]
Gateway --> SVC3[Product Service]
%% Service Mesh & Observability
SVC1 --> SM["Service Mesh (Istio)"]
SVC2 --> SM
SVC3 --> SM
SM --> OB["Observability (Prometheus + Jaeger + Grafana)"]
%% Message Bus
SVC1 --> MQ["Message Broker (Kafka / RabbitMQ)"]
SVC2 --> MQ
SVC3 --> MQ
%% Storage
SVC1 --> DB1["(PostgreSQL)"]
SVC2 --> DB2["(MongoDB)"]
SVC3 --> Cache[Redis]
%% CI/CD & Infra
Dev[Developer] --> CI["CI/CD Pipeline (GitLab CI / ArgoCD)"]
CI --> K8s[Kubernetes Cluster]
K8s --> SVC1
K8s --> SVC2
K8s --> SVC3
구성요소#
구분 | 구성요소 | 기능 | 역할 | 특징 / 기술 예시 | 적용 시나리오 또는 조건 |
---|
필수 | 컨테이너 런타임 | 컨테이너 이미지 실행 및 관리 | 라이프사이클 관리, 리소스 격리, 이미지 캐싱 | Docker, containerd, CRI-O 등 | 모든 컨테이너 기반 아키텍처의 기본 기반 |
| 컨테이너 오케스트레이션 | 컨테이너 클러스터 관리 및 스케줄링 | 배치, 디스커버리, 로드 밸런싱, 스케일링, 장애 복구 | Kubernetes (표준), 선언적 API | 수백 개 이상의 컨테이너/서비스 관리 필요 시 |
| 마이크로서비스 | 비즈니스 로직 단위의 독립적 서비스 | 도메인 기능 담당, 독립 배포, API 통신 | 단일 책임 원칙, DB 분리, 확장성 우수 | 유연한 배포, 독립 확장성, 도메인 기반 구조 필요 시 |
| API 게이트웨이 | 외부 요청의 통합 진입점 제공 | 인증, 인가, 요청 라우팅/집계, 속도 제한 | NGINX, Kong, Ambassador 등 | 외부 API 제어, 보안, 요청 관리가 필요한 구조 |
선택 | 서비스 메시 | 마이크로서비스 간 통신 제어 및 보안 | 트래픽 제어, 관찰성, 정책 관리, mTLS 적용 | Istio, Linkerd, Consul Connect 등 | 트래픽 라우팅, 사이드카 기반 보안 통신, 메쉬 통신 제어 필요 시 |
| 메시지 브로커 | 비동기 메시징 및 이벤트 기반 통신 | 서비스 간 결합 최소화, 이벤트 처리, 순서 보장 | Apache Kafka, RabbitMQ, Amazon SQS 등 | 이벤트 기반 처리, 데이터 스트리밍, 확장성 필요한 시스템 |
| 분산 캐시 | 고속 데이터 접근을 위한 캐싱 | DB 부하 감소, 응답 속도 향상, 세션 공유 | Redis, Memcached, Hazelcast 등 | 읽기 많은 트래픽, 응답 지연 최소화 필요 시 |
| 관찰가능성 스택 | 시스템 상태 추적 및 분석 | 메트릭 수집, 로그 집계, 분산 트레이싱 | Prometheus, Grafana, ELK Stack, Jaeger 등 | 장애 분석, 성능 모니터링, 운영 자동화 필요한 환경 |
구성요소별 기술 스택 매핑#
구성요소 | 기능 요약 | 주요 스택 예시 | 선택 기준 요약 |
---|
API Gateway | 외부 요청 라우팅 및 정책 적용 | NGINX, Envoy, Kong, Ambassador, AWS/GCP Gateway | 오픈소스 여부, 인증/인가, 성능, 통합 기능 여부 |
Service Mesh | 서비스 간 트래픽 제어 및 보안 | Istio, Linkerd, Consul Connect, Kuma | 정책 관리 필요 여부, 리소스 오버헤드, 설치 난이도, 운영 UI 지원 여부 |
Message Broker | 비동기 이벤트 및 메시지 전달 | Apache Kafka, RabbitMQ, NATS, Amazon SQS | Throughput, 내구성, 순서 보장 여부, 운영 복잡도 |
Container Runtime | 애플리케이션 실행 환경 | Docker, containerd, CRI-O | 보안 격리 수준, 커뮤니티 지원, 오케스트레이터 호환성 |
Orchestration | 컨테이너 관리 및 배치 | Kubernetes, OpenShift, Rancher | 멀티 클러스터 관리, 선언형 지원, 보안/네트워크 통합성 |
Observability | 모니터링, 로깅, 추적 통합 | Prometheus, Grafana, ELK Stack, Jaeger, Loki | 메트릭 정밀도, 시각화 도구 통합, 분산 추적 필요 여부 |
CI/CD | 자동 빌드, 테스트, 배포 | Jenkins, GitLab CI/CD, ArgoCD, Tekton | 선언형 배포 필요성, GitOps 여부, 확장성 |
Secret Management | 보안 구성요소의 저장 및 접근 제어 | HashiCorp Vault, AWS Secrets Manager, Sealed Secrets | 외부 통합 여부, 감사 로그, 자동화 지원 여부 |
Distributed Cache | 데이터 캐싱 및 성능 향상 | Redis, Memcached, Hazelcast | 읽기 집중 시스템 여부, 세션 공유 필요 여부 |
도입 판단 기준 체크리스트#
구성요소 | 판단 기준 질문 | 도입 권장 조건 요약 |
---|
API Gateway | 인증, 속도 제한, 트래픽 라우팅이 필요한가? | 다양한 API 엔드포인트 노출 및 인증/정책 통합 필요 시 |
Service Mesh | mTLS, 트래픽 셰이핑, A/B 테스팅 필요성 있는가? | 서비스 간 트래픽 보안, 라우팅 정책, 관찰성 향상이 중요한 경우 |
Message Broker | 서비스 간 느슨한 결합이 필요한가?대용량 이벤트를 처리해야 하는가? | 비동기 처리, 이벤트 기반 구조 필요 시 |
CI/CD | 수동 배포로 인한 장애/지연이 발생하는가?자동 테스트와 롤백이 필요한가? | 빈번한 배포, 자동화 기반 릴리즈 파이프라인 필요 시 |
Observability | 분산 환경에서 원인 분석이 어려운가?장애 대응 속도에 문제가 있는가? | 모니터링/로깅/트레이싱 통합 필요 시 |
Secret 관리 | 민감한 정보가 코드에 하드코딩되고 있는가?권한 관리와 감사 로그가 필요한가? | 보안 감사, 키 관리 체계화, 운영/개발 분리 필요 시 |
Orchestration | 수백 개 이상의 컨테이너 관리가 필요한가?자동 복구와 스케일링이 요구되는가? | 다수의 서비스 운영, 자원 자동 조정 및 탄력성이 필요한 경우 |
Cache | 응답 속도 향상이 필요한가?읽기 요청 비중이 높은가? | 세션 공유, 데이터베이스 부하 감소 필요 시 |
카테고리 | 항목 | 설명 | 주요 기술 요소 |
---|
확장성과 유연성 | 확장성 (Scalability) | 서비스 단위 수평 확장 가능, 트래픽 증가에 따라 자동 확장 (Auto-scaling) | 마이크로서비스, 컨테이너, Kubernetes, HPA |
| 유연성 (Flexibility) | 서비스 단위 배포/업데이트 가능, 변경에 따른 영향 최소화 | 독립 배포, 도메인 주도 설계 |
복원력 | 탄력성 (Resilience) | 장애 격리 및 자동 복구 기능을 통해 시스템 전체로의 장애 전파 방지 | 서킷 브레이커, 헬스체크, Kubernetes 복구 |
| 복원력 (Recoverability) | 자동 재시작, 상태 비저장 (stateless) 설계로 고가용성 보장 | Liveness/Readiness Probe, Stateless Design |
개발 생산성 | 배포 민첩성 (Deployment Agility) | 빠른 배포, 롤백, 테스트 가능. 기능 단위로 CI/CD 파이프라인 자동화 | CI/CD, GitOps, ArgoCD, Tekton |
| 개발 속도 향상 | 팀별 독립 개발 및 기술 선택 가능, 반복적 기능 출시 속도 증가 | 팀별 마이크로서비스, DevOps 문화 |
기술 다양성 | 기술 이기종성 지원 | 서비스별 최적 기술 스택 사용 가능 (ex. Node.js, Go, Python, Rust 등 혼합) | Polyglot Architecture |
운영 효율성 | 인프라 효율성 | 불변 인프라, IaC 기반 운영 자동화, 반복 가능하고 안정적인 배포 가능 | Terraform, Helm, Kustomize, Immutable Infra |
| 운영 자동화 | 운영 비용 절감, 인적 오류 감소 | IaC, 오케스트레이션, 자동화 스크립트 |
이식성 | 환경 독립성 (Portability) | 컨테이너화를 통해 멀티 클라우드, 하이브리드 클라우드 간 이동 및 실행 환경 일관성 확보 | Docker, Kubernetes, OCI |
관찰가능성 | 관측성 (Observability) | 모니터링, 로깅, 트레이싱 통합으로 문제 원인 추적 용이, 시스템 상태 실시간 확인 가능 | OpenTelemetry, Prometheus, Grafana, Jaeger |
보안성 | 보안 강화 | 마이크로세그멘테이션, 제로 트러스트 모델, mTLS 적용으로 보안 정책 세분화 및 공격 표면 최소화 | Zero Trust, mTLS, OPA, 서비스 메시 |
단점과 문제점 그리고 해결방안#
카테고리 | 항목 | 설명 | 해결책/완화 전략 |
---|
설계 복잡성 | 아키텍처 복잡성 | 마이크로서비스 분리, 서비스 메시, 이벤트 브로커 등 다양한 요소 도입으로 전체 구조 복잡 | 표준화된 설계 가이드 도입, 마이크로서비스 분리 기준 정립, DDD 적용 |
초기 투자 | 도입 비용 | 인프라 구성, 학습 및 도구 도입에 따른 초기 진입 장벽 존재 | 클라우드 매니지드 서비스 활용, 오픈소스 도구 중심의 도입, 단계적 확산 |
운영 복잡성 | 배포 및 모니터링 오버헤드 | CI/CD, 모니터링, 로깅, 트레이싱 등 다수의 운영 요소 관리 필요 | 통합 DevOps 플랫폼 도입, GitOps, 자동화 파이프라인 설계 |
성능 문제 | 네트워크 지연 | 서비스 간 통신 증가로 인한 네트워크 지연 시간 누적 | gRPC 채택, 캐싱, 메시지 버퍼링, 로컬 처리 로직 추가 |
데이터 처리 | 일관성 유지 어려움 | 분산 DB 및 상태 관리로 인한 트랜잭션 및 정합성 처리 복잡성 | Event Sourcing, Saga, CQRS, 최종 일관성 수용 |
리소스 관리 | 리소스 과소/과다 할당 | 수많은 컨테이너의 리소스 제어 어려움으로 인한 오버헤드 가능성 | HPA/VPA 활용, 클러스터 오토스케일링, 프로파일링 도입 |
보안 부담 | 확장된 공격 표면 | 서비스 수 증가 → 인증·인가, 통신 암호화, 이미지 보안 등 고려 영역 증가 | DevSecOps, mTLS, 취약점 스캐닝 자동화, 사이드카 보안 적용 |
문제점#
카테고리 | 문제 항목 | 원인 | 영향 | 탐지 및 진단 도구 | 예방 전략 | 해결 전략 |
---|
성능 문제 | 네트워크 지연 | 서비스 간 과도한 호출, 프록시 레이어 증가 | 처리 지연, 응답 시간 증가 | Prometheus, Grafana, APM | 캐싱 도입, gRPC 사용, 호출 횟수 제한 | 서비스 메시 최적화, 호출 최소화 |
시스템 안정성 | 분산 시스템 카오스 | 네트워크 단절, 의존성 고리 증가로 인한 장애 전파 | 연쇄 장애, 시스템 불안정 | 서비스 맵, 분산 추적, 카오스 테스트 도구 | 서킷 브레이커, 타임아웃 설정 | 장애 격리 패턴, 카오스 엔지니어링 적용 |
리소스 충돌 | 컨테이너 리소스 경합 | CPU, Memory 할당 미흡 또는 과다 설정 | OOM, 스로틀링, 시스템 과부하 | kube-metrics-server, APM, cAdvisor | 수직 확장, 자원 자동 조정 정책 적용 | 자원 제한 재조정, 클러스터 오토스케일링 |
데이터 문제 | 데이터 불일치 | 네트워크 파티션, 중복 요청, 동시성 처리 실패 | 데이터 무결성 훼손, 사용자 혼란 | 데이터 무결성 검증, 로그 분석, 보상 트랜잭션 모니터링 | 멱등성 처리, 이벤트 소싱, 트랜잭션 보상 처리 | 복구 프로세스 설계, 운영자 승인 기반 수동 처리 |
보안 이슈 | 보안 취약점 | 이미지 취약점, 네트워크 노출, 인증·인가 누락 | 데이터 유출, 악성 침입 | 취약점 스캐너 (Trivy, Snyk), 감사 로그 분석 | 보안 코드 리뷰, 네트워크 정책 적용, mTLS | 자동 패치 시스템, 정기 보안 감사 |
관측성 부족 | 장애 원인 불분명 | 분산 시스템의 복잡성으로 인한 로그, 메트릭, 트레이스 통합 어려움 | 문제 진단 지연, MTTR 증가 | OpenTelemetry, Jaeger, Loki, Kibana | 표준 로깅/트레이싱 포맷 통일, 로그 집계 시스템 구성 | 관측성 강화, 서비스 별 상태 시각화 대시보드 구축 |
도전 과제#
카테고리 | 도전 과제 | 원인/설명 | 해결책/전략 | |
---|
기술적 과제 | 데이터 일관성 | 마이크로서비스 간 개별 DB 사용, 분산 트랜잭션 부재 | 이벤트 소싱, CQRS, SAGA 패턴, 최종 일관성 모델 적용 | |
| 분산 시스템 복잡성 | 수십~수백 개 서비스, 오케스트레이션 복잡도 증가 | 서비스 메시, 통합 오케스트레이션 (Kubernetes), IaC 기반 자동화 | |
| 서비스 간 통신 지연/오버헤드 | HTTP/JSON 직렬화, 네트워크 지연, 불필요한 연쇄 호출 | gRPC 도입, 비동기 메시징, 캐시 레이어, 서비스 콜로케이션 | |
| 멀티 클라우드 관리 | 서로 다른 API 및 서비스 모델, 벤더 종속성 | Terraform/Pulumi, Anthos/Azure Arc, Helm/Kustomize 등 추상화 도구 활용 | |
| 관찰가능성 확보 | 서비스 간 상호작용이 복잡, 문제 원인 파악 어려움 | OpenTelemetry, 통합 모니터링 스택 (Prometheus, Grafana, ELK), 이상 탐지 시스템 구축 | |
| 보안 관리 | 다중 API 및 통신 채널, 인증/인가 정책 복잡도 | DevSecOps 도입, mTLS, 중앙 인증 시스템 (OAuth2/OpenID), 정기 취약점 스캔 자동화 | |
운영적 과제 | 운영 자동화 및 관리 난이도 | 서비스/컨테이너 증가 → 인프라/배포 복잡도 증가 | GitOps, ArgoCD, Tekton, 자동 스케일링/복구 도입 | |
| 비용 최적화 | 리소스 과다 할당, 무분별한 확장 → 비용 증가 | 비용 할당 모니터링, HPA/스팟 인스턴스, 자동 스케일링 정책 개선 | |
| 레거시 시스템과 통합 | 온프레미스 - 클라우드 이질성, 기존 자산 활용 요구 | 하이브리드 클라우드 전략, API 게이트웨이, 점진적 마이그레이션 (스트랭글러 패턴) | |
| 실시간 대응/회복력 부족 | 장애 발생 시 원인 파악/복구 지연 | MTTR 기반 운영지표 도입, 자동 복구 설정, 카오스 엔지니어링 테스트 | |
조직적 과제 | DevOps 문화 정착 | 전통적 개발/운영 분리, 책임 영역 충돌 | 크로스 펑셔널 팀 구성, 지속적 교육, 점진적 조직 구조 전환 | |
| 스킬 격차 | 최신 기술 스택에 대한 경험 부족 | 내부 교육/멘토링, 외부 전문가 도입, 커뮤니티 활동 활성화 | |
| 규정 준수/보안 인증 | 산업별 법적 규제, 보안 정책 강화 필요 | 정책 코드화 (OPA), 감사 로그 저장, PCI-DSS/HIPAA 등 표준 대응 | |
- 기술적 도전 과제는 대부분 아키텍처 설계·구현의 선택과 직결되며, 설계 단계에서 미리 해결책을 통합하는 것이 중요.
- 운영적 과제는 자동화와 가시성 확보가 핵심이며, 오케스트레이션과 관찰가능성 도구가 필수.
- 조직적 과제는 기술보다 문화와 협업 방식의 변화가 요구되며, DevOps/Platform Engineering 기반의 접근이 필요.
분류 기준에 따른 종류 및 유형#
분류 기준 | 유형/모델 | 설명 및 특징 | 대표 예시 |
---|
배포 환경 | 퍼블릭 클라우드 | 공개 클라우드 환경에서 운영 | AWS, GCP, Azure |
| 프라이빗 클라우드 | 온프레미스 또는 전용 클라우드 인프라에서 운영 | OpenStack, VMware vSphere |
| 하이브리드 클라우드 | 퍼블릭과 프라이빗 클라우드 혼합 운영 | Red Hat OpenShift, Azure Arc |
| 멀티 클라우드 | 복수의 퍼블릭 클라우드를 병렬로 활용 | AWS + GCP 조합 운영 |
| 매니지드 서비스 | 클라우드 벤더에서 제공하는 관리형 플랫폼 사용 | GKE, EKS, AKS |
| 셀프 매니지드 | 자체적으로 Kubernetes 등을 설치·운영 | kubeadm, Rancher |
서비스 구조 | 마이크로서비스 | 독립적으로 배포 및 확장 가능한 작은 서비스 단위로 구성 | Netflix, Amazon |
| 미니 서비스 | 마이크로서비스보다 큰 단위로 적절히 나눈 중간 크기의 서비스 단위 | 일부 기업의 내부 BFF 서비스 구조 |
| 모듈러 모놀리스 | 단일 애플리케이션이지만 내부를 모듈화해 독립성 유지 | 초기 단계 스타트업, 단계적 마이그레이션 구조 |
컴퓨팅 모델 | 컨테이너 기반 | 컨테이너화된 애플리케이션과 런타임을 통해 일관성 있고 이식 가능한 배포 가능 | Docker, Kubernetes, containerd |
| 서버리스 | 인프라를 숨기고 함수 단위로 실행, 이벤트 기반 트리거 지원 | AWS Lambda, Azure Functions |
| 불변 인프라 | 코드로 정의된 인프라를 배포 후 변경하지 않고 재생성 | Terraform, Pulumi, Immutable AMI |
통신 방식 | 동기 통신 | 요청 - 응답 기반의 직접 API 호출 | REST, gRPC |
| 비동기 통신 | 메시지 브로커를 통한 비동기 메시지 전달 | Kafka, RabbitMQ |
| 이벤트 기반 | 이벤트 발행 - 구독을 중심으로 상태 변화 모델링 | Event Sourcing, CQRS |
| 직접 통신 | API Gateway 없이 서비스 간 직접 호출 | 내부 서비스 API 통신 |
데이터 관리 | 서비스별 데이터베이스 분리 | 각 서비스가 고유한 데이터베이스를 가짐 | Microservices with DB-per-service |
| 공유 데이터베이스 | 여러 서비스가 하나의 데이터베이스를 공유 | 기존 모놀리스 시스템에서 마이그레이션 중인 구조 |
| 이벤트 기반 데이터 동기화 | 이벤트 로그를 기반으로 상태를 동기화, 복구도 가능 | Kafka + Event Sourcing |
네트워킹 구조 | 서비스 메시 | 서비스 간 트래픽 관리, 보안, 정책 적용 등을 위한 전용 계층 구성 | Istio, Linkerd |
| API 게이트웨이 | 외부 요청의 단일 진입점, 인증/인가, 라우팅 정책 적용 | Kong, Envoy, AWS API Gateway |
| 직접 통신 | 서비스 간 직접 API 호출, 간단한 구조, 낮은 지연 시간 | REST/gRPC 호출 |
클라우드 적합도 | 클라우드 네이티브 | 처음부터 클라우드를 위해 설계됨 | 12-Factor App, Kubernetes-native 구조 |
| 클라우드 지원 | 기존 앱을 클라우드 환경으로 포팅함 | 모놀리스 앱을 Lift-and-Shift 방식으로 이전 |
| 클라우드 최적화 | 클라우드 특성을 일부 반영해 구조 개선 | DB 연결 풀링, S3 등 관리형 서비스로 전환 |
- 배포/환경 관점: 클라우드 운영 형태 (퍼블릭, 프라이빗, 하이브리드, 매니지드, 셀프 매니지드)
- 구조적 관점: 마이크로서비스, 서버리스, 컨테이너화, 이벤트 기반, 직접 통신
- 운영/전환 관점: 클라우드 네이티브 ↔ 지원 ↔ 최적화 / 불변 인프라 전략
- 데이터/통신 관점: 동기/비동기, 데이터베이스 구조, 이벤트 기반 처리
실무 사용 예시#
분야 | 사례/적용 예시 | 주요 목적 | 기술 스택 | 기대 효과 |
---|
전자상거래 | 쇼핑몰 플랫폼 (예: Shopify, Amazon, 11 번가) | 고가용성, 빠른 배포, 트래픽 대응 | React, Node.js, MongoDB, Redis, Kubernetes, 서버리스 | 99.9% 가용성, 피크 트래픽 처리, 신속한 기능 릴리즈 |
금융/핀테크 | 결제 및 뱅킹 시스템 (예: Capital One, 카카오뱅크) | 실시간 트랜잭션 처리, 보안 강화 | Spring Boot, PostgreSQL, Kafka, API Gateway, Serverless | 초당 수만 건 처리, PCI DSS/HIPAA 준수, 규제 대응 |
미디어/스트리밍 | 글로벌 스트리밍 플랫폼 (예: Netflix, YouTube) | 대규모 콘텐츠 전송, 글로벌 확장 | Golang, Cassandra, CDN, S3, Chaos Engineering, Kubernetes | 글로벌 사용자 동시 접속 처리, 무중단 스트리밍 |
IoT/센서 데이터 | 실시간 IoT 플랫폼 | 고빈도 이벤트 수집 및 분석 | Python, Apache Pulsar, InfluxDB, Grafana | 초당 백만 건 데이터 수집, 실시간 분석 및 시각화 |
소셜 플랫폼 | SNS/메신저 시스템 (예: Slack, Twitter) | 콘텐츠 생성/알림, 관계 중심 데이터 관리 | React Native, GraphQL, Neo4j, Redis, ElasticSearch | 개인화 피드, 빠른 알림 전파, 확장성 높은 구조 |
게임/실시간 엔터테인먼트 | 멀티플레이어 게임 백엔드 | 실시간 상태 동기화, 전역 통신 | Unity, C#, Redis Cluster, WebSocket, Leaderboard | 게임 내 실시간 반응, 글로벌 매치 지원 |
헬스케어 | 의료 정보 시스템 (예: Philips HealthSuite) | 데이터 연계, 개인정보 보호, 원격 모니터링 | Java, FHIR, Apache Kafka, PostgreSQL, 인증 시스템 | HIPAA 대응, 안전한 데이터 교환, 환자 맞춤형 서비스 |
교육/에듀테크 | 온라인 학습 플랫폼 | 다중 사용자 수업 지원, 콘텐츠 분배, 실시간 피드백 | Vue.js, Laravel, MySQL, WebRTC | 학습 진도 추적, 실시간 수업, 유연한 콘텐츠 운영 |
DevOps 플랫폼 | SaaS/배포 자동화 | CI/CD 통합, 운영 자동화 | Docker, Jenkins, Terraform, ArgoCD | 빠른 릴리즈, 오류 최소화, 협업 가속화 |
공공/정부 시스템 | 클라우드 기반 민원 처리 시스템 | 대국민 안정적 서비스, API 기반 통합 | Kubernetes, API Gateway, OAuth, OpenID | 확장성, 보안성, 다양한 디바이스 대응 |
실시간 데이터 처리 | 데이터 흐름 처리 파이프라인 | 스트림 분석, 이벤트 기반 마이크로서비스 | Kafka, Flink, ArgoCD, CQRS, Event Sourcing | 낮은 지연, 높은 처리량, 유연한 데이터 분리 구조 |
- 도메인 다양성: 금융, 이커머스, 게임, IoT, 헬스케어 등 다양한 산업군에서 활용.
- 기술 스택 조합: 전통적인 백엔드 언어부터 최신 클라우드 네이티브 기술까지 혼합.
- 공통 패턴: 마이크로서비스, Kafka, API Gateway, 서버리스, Kubernetes, CI/CD.
- 핵심 효과: 실시간성, 고가용성, 확장성, 컴플라이언스 대응, 자동화.
활용 사례#
사례 1: 쿠팡의 주문 처리 시스템#
구성 요소: 사용자 서비스 → 주문 마이크로서비스 → 재고 관리 → 결제 시스템
아키텍처:
graph LR
A[User Frontend] --> B[Order Service]
B --> C[Inventory Service]
B --> D[Payment Service]
C --> E[Database]
D --> F[Payment Gateway]
Workflow: 사용자 주문 → 주문 서비스 처리 → 재고 확인 → 결제 → 완료
Cloud-Native 요소: 모든 서비스는 Docker + Kubernetes 로 구성되며, 이벤트 브로커 (Kafka) 를 통해 비동기 처리
비교: 기존 모놀리식 대비 장애 분리 가능, 배포 속도 증가, 트래픽 피크 대응 유리
사례 2: 이커머스 플랫폼의 Cloud-Native 도입#
적용 목적: 유연한 트래픽 대응, 빠른 신규 기능 롤아웃, 장애 복원력 확보
시스템 구성도:
graph LR
UI[Frontend]
UI --> APIGW[API Gateway]
APIGW --> Order[Order Service]
APIGW --> Payment[Payment Service]
APIGW --> Product[Product Service]
Order --> OrderDB[(Order DB)]
Payment --> PaymentDB[(Payment DB)]
Product --> ProductDB[(Product DB)]
워크플로우:
- 사용자가 주문 요청을 API Gateway 로 전송
- Gateway 는 Order Service 로 전달
- Order Service 는 주문 정보 저장 및 이벤트 발행 (예: Kafka)
- Payment Service 는 이벤트 수신 후 결제 처리
- 처리 후 결과를 API Gateway 를 통해 사용자에게 응답
역할 분담:
- Frontend: 사용자 요청 입력
- API Gateway: 요청 라우팅 및 인증
- 각 Service: 독립된 도메인 기능 처리
- DB: 서비스별 데이터 저장소로 분리
사례 3: Netflix 클라우드 네이티브 전환#
Netflix 는 2008 년 데이터 센터 장애를 겪은 후 2009 년부터 7 년에 걸쳐 클라우드 네이티브 아키텍처로 완전 전환했다. 현재 전 세계 190 개국에서 2 억 명 이상의 사용자에게 서비스를 제공하며, 일일 시청 시간 10 억 시간을 처리하는 대규모 클라우드 네이티브 시스템을 운영하고 있다.
시스템 구성
graph TB
subgraph "CDN Layer"
CDN[Open Connect CDN]
end
subgraph "AWS Cloud"
subgraph "Frontend Services"
UI[User Interface]
API[API Gateway]
end
subgraph "Core Microservices"
USER[User Service]
RECO[Recommendation Engine]
PLAY[Playback Service]
META[Metadata Service]
BILL[Billing Service]
end
subgraph "Data Layer"
CASS[Cassandra]
ES[Elasticsearch]
REDIS[Redis Cache]
end
subgraph "Platform Services"
EUREKA[Service Discovery]
ZUUL[Load Balancer]
HYSTRIX[Circuit Breaker]
end
subgraph "Analytics"
KAFKA[Kafka Streams]
SPARK[Spark Analytics]
S3[Data Lake S3]
end
end
CDN --> UI
UI --> API
API --> USER
API --> RECO
API --> PLAY
API --> META
API --> BILL
USER --> CASS
RECO --> ES
PLAY --> REDIS
EUREKA --> USER
EUREKA --> RECO
EUREKA --> PLAY
ZUUL --> USER
ZUUL --> RECO
HYSTRIX --> USER
HYSTRIX --> RECO
KAFKA --> SPARK
SPARK --> S3
활용 사례 Workflow:
sequenceDiagram
participant User as 사용자
participant CDN as CDN
participant Gateway as API Gateway
participant Auth as 인증 서비스
participant Profile as 프로필 서비스
participant Reco as 추천 엔진
participant Content as 콘텐츠 서비스
participant Analytics as 분석 서비스
User->>CDN: 웹/앱 접속
CDN->>Gateway: 요청 전달
Gateway->>Auth: 사용자 인증
Auth-->>Gateway: 인증 토큰
Gateway->>Profile: 사용자 프로필 조회
Profile-->>Gateway: 시청 이력, 선호도
Gateway->>Reco: 추천 요청
Reco->>Content: 콘텐츠 메타데이터 조회
Content-->>Reco: 콘텐츠 정보
Reco-->>Gateway: 개인화된 추천 목록
Gateway-->>CDN: 추천 결과
CDN-->>User: 개인화된 홈화면
Note over Analytics: 실시간 사용자 행동 분석
User->>Analytics: 클릭, 시청 이벤트
사례 4: 글로벌 전자상거래 플랫폼#
대규모 온라인 쇼핑몰에서 Cloud-Native Architecture 를 도입하여 트래픽 급증과 글로벌 확장에 대응한 사례
시스템 구성:
graph TB
subgraph "외부 사용자"
A[웹 브라우저]
B[모바일 앱]
C[파트너 API]
end
subgraph "CDN 및 로드밸런서"
D[CloudFlare CDN]
E[AWS ALB]
end
subgraph "API 게이트웨이"
F[Kong Gateway]
G[인증 서비스]
end
subgraph "마이크로서비스 (Kubernetes)"
H[사용자 서비스]
I[상품 서비스]
J[주문 서비스]
K[결제 서비스]
L[배송 서비스]
M[알림 서비스]
end
subgraph "데이터 계층"
N[PostgreSQL<br/>사용자/주문]
O[MongoDB<br/>상품 카탈로그]
P[Redis<br/>세션/캐시]
Q[Elasticsearch<br/>검색]
end
subgraph "메시징"
R[Apache Kafka<br/>이벤트 스트림]
end
subgraph "서비스 메시"
S[Istio Control Plane]
T[Envoy Proxies]
end
A --> D
B --> D
C --> E
D --> E
E --> F
F --> G
G --> H
G --> I
G --> J
G --> K
G --> L
G --> M
H --> N
I --> O
J --> N
K --> N
L --> N
M --> P
I --> Q
J --> R
K --> R
L --> R
M --> R
S --> T
T --> H
T --> I
T --> J
T --> K
T --> L
T --> M
워크플로우:
sequenceDiagram
participant U as 사용자
participant G as API Gateway
participant US as 사용자 서비스
participant PS as 상품 서비스
participant OS as 주문 서비스
participant PayS as 결제 서비스
participant SS as 배송 서비스
participant NS as 알림 서비스
participant K as Kafka
U->>G: 주문 요청
G->>US: 사용자 인증 확인
US-->>G: 인증 성공
G->>PS: 상품 재고 확인
PS-->>G: 재고 확인됨
G->>OS: 주문 생성
OS->>K: 주문 생성 이벤트 발행
OS-->>G: 주문 ID 반환
G-->>U: 주문 확인
K->>PayS: 결제 처리 요청
PayS->>K: 결제 완료 이벤트
K->>SS: 배송 준비 요청
SS->>K: 배송 시작 이벤트
K->>NS: 알림 발송 요청
NS->>U: 주문 확인 알림
Cloud-Native Architecture 의 역할:
트래픽 급증 대응
- 자동 스케일링으로 블랙 프라이데이 300% 트래픽 증가 처리
- 마이크로서비스별 독립적 확장으로 리소스 효율성 달성
- CDN 과 캐싱 전략으로 응답 시간 50% 개선
글로벌 서비스 제공
- 멀티 리전 배포로 지역별 데이터 센터 운영
- 지역별 데이터 컴플라이언스 요구사항 충족
- 24/7 무중단 서비스 운영
개발 생산성 향상
- 마이크로서비스별 독립 개발팀 운영
- CI/CD 파이프라인으로 일 평균 50 회 배포
- 기능별 A/B 테스트 및 카나리 배포
구현 예시#
전자상거래 플랫폼의 주문 처리 마이크로서비스
1
2
3
4
5
6
7
8
9
10
| # 전체 마이크로서비스 아키텍처 구성도
# Client → API Gateway (Kong) → Service Mesh (Istio) → Order Service
# ↓
# User Service, Product Service, Payment Service
# ↓
# Database Layer (PostgreSQL)
# ↓
# Cache Layer (Redis)
# ↓
# Message Queue (Kafka)
|
이러한 구조는 마이크로서비스 아키텍처의 핵심 원칙들을 따른다:
- 단일 책임 원칙: 각 모듈과 서비스가 명확한 역할을 가진다.
- 느슨한 결합: 인터페이스를 통한 통신으로 의존성을 최소화한다.
- 높은 응집도: 관련된 기능들이 하나의 모듈에 응집되어 있다.
- 확장성: 수평적 스케일링과 로드 밸런싱을 지원한다.
- 장애 격리: 서킷 브레이커, 재시도, 타임아웃 등으로 장애 전파를 방지한다.
- 관찰 가능성: 로깅, 메트릭, 트레이싱을 통한 시스템 가시성을 제공한다.
- 보안: 네트워크 정책, RBAC, 시크릿 관리를 통한 다층 보안을 구현한다.
설정 관리 레이어 (Configuration Layer)#
config.py#
애플리케이션의 모든 설정을 중앙화하여 관리하는 모듈. 데이터베이스, Redis, Kafka, 외부 서비스 등의 연결 설정을 환경 변수로부터 읽어와 Pydantic BaseSettings 를 통해 검증하고 타입 안전성을 보장한다. 개발/운영 환경 구분과 설정 검증 기능을 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| # 환경 설정 및 구성 관리 모듈
# 모든 환경 변수와 애플리케이션 설정을 중앙화하여 관리
import os
from typing import Optional
from pydantic import BaseSettings, Field
class DatabaseConfig(BaseSettings):
"""데이터베이스 연결 설정"""
url: str = Field(default="postgresql://user:password@localhost/orderdb", env="DATABASE_URL")
pool_pre_ping: bool = True
pool_recycle: int = 300
echo: bool = Field(default=False, env="DB_ECHO") # SQL 쿼리 로깅
class RedisConfig(BaseSettings):
"""Redis 캐시 설정"""
url: str = Field(default="redis://localhost:6379", env="REDIS_URL")
decode_responses: bool = True
socket_timeout: int = 5
socket_connect_timeout: int = 5
default_ttl: int = 300 # 기본 캐시 만료 시간 (초)
class KafkaConfig(BaseSettings):
"""Kafka 메시징 설정"""
bootstrap_servers: str = Field(default="localhost:9092", env="KAFKA_SERVERS")
topic_prefix: str = Field(default="ecommerce", env="KAFKA_TOPIC_PREFIX")
order_events_topic: str = "order-events"
retry_backoff_ms: int = 1000
request_timeout_ms: int = 30000
class ExternalServiceConfig(BaseSettings):
"""외부 서비스 연결 설정"""
user_service_url: str = Field(default="http://user-service:8080", env="USER_SERVICE_URL")
product_service_url: str = Field(default="http://product-service:8080", env="PRODUCT_SERVICE_URL")
payment_service_url: str = Field(default="http://payment-service:8080", env="PAYMENT_SERVICE_URL")
http_timeout: int = 30 # HTTP 요청 타임아웃 (초)
retry_attempts: int = 3 # 재시도 횟수
class ApplicationConfig(BaseSettings):
"""애플리케이션 전역 설정"""
app_name: str = "Order Service"
app_version: str = "1.0.0"
environment: str = Field(default="development", env="ENVIRONMENT")
log_level: str = Field(default="INFO", env="LOG_LEVEL")
debug: bool = Field(default=False, env="DEBUG")
# API 설정
api_prefix: str = "/api/v1"
docs_url: Optional[str] = "/docs"
redoc_url: Optional[str] = "/redoc"
# 보안 설정
cors_origins: list = ["*"] # 운영환경에서는 특정 도메인으로 제한
cors_methods: list = ["*"]
cors_headers: list = ["*"]
class Settings:
"""통합 설정 클래스"""
def __init__(self):
self.database = DatabaseConfig()
self.redis = RedisConfig()
self.kafka = KafkaConfig()
self.external_services = ExternalServiceConfig()
self.app = ApplicationConfig()
@property
def is_production(self) -> bool:
"""운영 환경 여부 확인"""
return self.app.environment.lower() == "production"
@property
def is_development(self) -> bool:
"""개발 환경 여부 확인"""
return self.app.environment.lower() == "development"
# 전역 설정 인스턴스
settings = Settings()
|
utils.py#
공통 유틸리티 기능들을 제공하는 모듈. 구조화된 JSON 로깅, 메트릭 수집, 실행 시간 측정 데코레이터, 헬스 체크 관리 등의 기능을 포함한다. 특히 StructuredLogger 는 마이크로서비스 환경에서 로그 분석을 용이하게 하는 JSON 형태의 로그를 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
| # 공통 유틸리티 및 로깅 설정 모듈
# 구조화된 로그, 헬스 체크, 메트릭 수집 등 공통 기능 제공
import logging
import json
import time
from datetime import datetime
from typing import Dict, Any
from functools import wraps
from config import settings
class StructuredLogger:
"""구조화된 JSON 로그 생성기"""
def __init__(self, service_name: str = "order-service"):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
# 로그 핸들러 설정
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(getattr(logging, settings.app.log_level.upper()))
def _format_log(self, level: str, message: str, **kwargs) -> str:
"""로그 메시지를 JSON 형태로 포맷팅"""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"service": self.service_name,
"message": message,
"environment": settings.app.environment
}
# 추가 컨텍스트 정보 포함
if kwargs:
log_data.update(kwargs)
return json.dumps(log_data, ensure_ascii=False)
def info(self, message: str, **kwargs):
"""정보 레벨 로그"""
self.logger.info(self._format_log("INFO", message, **kwargs))
def error(self, message: str, **kwargs):
"""에러 레벨 로그"""
self.logger.error(self._format_log("ERROR", message, **kwargs))
def warning(self, message: str, **kwargs):
"""경고 레벨 로그"""
self.logger.warning(self._format_log("WARNING", message, **kwargs))
def debug(self, message: str, **kwargs):
"""디버그 레벨 로그"""
self.logger.debug(self._format_log("DEBUG", message, **kwargs))
# 전역 로거 인스턴스
logger = StructuredLogger()
class MetricsCollector:
"""애플리케이션 메트릭 수집기 (Prometheus 연동)"""
def __init__(self):
self.metrics = {}
self.start_time = time.time()
def increment_counter(self, metric_name: str, labels: Dict[str, str] = None):
"""카운터 메트릭 증가"""
key = f"{metric_name}_{labels}" if labels else metric_name
self.metrics[key] = self.metrics.get(key, 0) + 1
def record_histogram(self, metric_name: str, value: float, labels: Dict[str, str] = None):
"""히스토그램 메트릭 기록"""
key = f"{metric_name}_histogram_{labels}" if labels else f"{metric_name}_histogram"
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(value)
def get_metrics(self) -> Dict[str, Any]:
"""수집된 메트릭 반환"""
return {
"uptime_seconds": time.time() - self.start_time,
"metrics": self.metrics,
"timestamp": datetime.utcnow().isoformat()
}
# 전역 메트릭 수집기
metrics = MetricsCollector()
def log_execution_time(operation_name: str):
"""함수 실행 시간을 로그로 기록하는 데코레이터"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(
f"{operation_name} 실행 완료",
execution_time=execution_time,
function=func.__name__
)
# 메트릭 기록
metrics.record_histogram(f"{operation_name}_duration", execution_time)
metrics.increment_counter(f"{operation_name}_total")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(
f"{operation_name} 실행 실패",
error=str(e),
execution_time=execution_time,
function=func.__name__
)
# 에러 메트릭 기록
metrics.increment_counter(f"{operation_name}_errors")
raise
return wrapper
return decorator
class HealthChecker:
"""헬스 체크 관리 클래스"""
def __init__(self):
self.checks = {}
def register_check(self, name: str, check_func):
"""헬스 체크 함수 등록"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""모든 헬스 체크 실행"""
results = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"checks": {}
}
overall_status = True
for name, check_func in self.checks.items():
try:
check_result = await check_func()
results["checks"][name] = {
"status": "healthy" if check_result else "unhealthy",
"details": check_result
}
if not check_result:
overall_status = False
except Exception as e:
results["checks"][name] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = False
results["status"] = "healthy" if overall_status else "unhealthy"
return results
# 전역 헬스 체커
health_checker = HealthChecker()
|
데이터베이스 레이어 (Database Layer)#
database.py#
SQLAlchemy 를 사용한 데이터베이스 연결 및 세션 관리를 담당. 연결 풀 관리, 트랜잭션 처리, 컨텍스트 매니저를 통한 안전한 세션 관리 기능을 제공한다. 데이터베이스 이벤트 리스너를 통해 연결 최적화와 쿼리 로깅을 수행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
| # 데이터베이스 연결 및 세션 관리 모듈
# SQLAlchemy ORM 설정과 데이터베이스 연결 풀 관리
from sqlalchemy import create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from contextlib import contextmanager
from config import settings
from utils import logger
class DatabaseManager:
"""데이터베이스 연결 및 세션 관리 클래스"""
def __init__(self):
self.engine = None
self.SessionLocal = None
self.Base = declarative_base()
self._initialize_engine()
def _initialize_engine(self):
"""SQLAlchemy 엔진 초기화"""
self.engine = create_engine(
settings.database.url,
poolclass=QueuePool,
pool_size=10, # 기본 연결 풀 크기
max_overflow=20, # 최대 오버플로우 연결 수
pool_pre_ping=settings.database.pool_pre_ping,
pool_recycle=settings.database.pool_recycle,
echo=settings.database.echo,
pool_timeout=30, # 연결 대기 타임아웃
pool_reset_on_return='commit' # 연결 반환 시 커밋
)
# 데이터베이스 연결 이벤트 리스너 등록
self._register_event_listeners()
# 세션 팩토리 생성
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine,
expire_on_commit=False # 커밋 후 객체 만료 방지
)
logger.info("데이터베이스 엔진 초기화 완료")
def _register_event_listeners(self):
"""데이터베이스 이벤트 리스너 등록"""
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""연결 시 데이터베이스별 설정 적용"""
if "sqlite" in str(dbapi_connection):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
@event.listens_for(self.engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
"""쿼리 실행 전 로깅"""
if settings.database.echo:
logger.debug("SQL 쿼리 실행", query=statement, params=parameters)
def create_tables(self):
"""데이터베이스 테이블 생성"""
try:
self.Base.metadata.create_all(bind=self.engine)
logger.info("데이터베이스 테이블 생성 완료")
except Exception as e:
logger.error("데이터베이스 테이블 생성 실패", error=str(e))
raise
def get_session(self) -> Session:
"""데이터베이스 세션 생성"""
return self.SessionLocal()
@contextmanager
def get_session_context(self):
"""컨텍스트 매니저를 사용한 세션 관리"""
session = self.get_session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error("데이터베이스 트랜잭션 롤백", error=str(e))
raise
finally:
session.close()
async def health_check(self) -> bool:
"""데이터베이스 연결 상태 확인"""
try:
with self.get_session_context() as session:
session.execute("SELECT 1")
return True
except Exception as e:
logger.error("데이터베이스 헬스 체크 실패", error=str(e))
return False
def close(self):
"""데이터베이스 연결 종료"""
if self.engine:
self.engine.dispose()
logger.info("데이터베이스 연결 종료")
# 전역 데이터베이스 매니저 인스턴스
db_manager = DatabaseManager()
# 의존성 주입용 함수
def get_db() -> Session:
"""FastAPI 의존성 주입용 데이터베이스 세션 생성 함수"""
session = db_manager.get_session()
try:
yield session
finally:
session.close()
|
models.py#
데이터베이스 모델 (SQLAlchemy ORM) 과 API 스키마 (Pydantic) 를 정의한다. 주문 상태 열거형, 주문 및 주문 아이템 모델, 요청/응답 스키마들을 포함한다. 데이터 검증, 직렬화/역직렬화, 인덱스 최적화 등의 기능을 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
| # 데이터베이스 모델 및 API 스키마 정의 모듈
# SQLAlchemy ORM 모델과 Pydantic 스키마를 분리하여 관리
from sqlalchemy import Column, Integer, String, Float, DateTime, Enum, Index, ForeignKey
from sqlalchemy.orm import relationship
from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional, List
import enum
from database import db_manager
# SQLAlchemy 베이스 클래스
Base = db_manager.Base
# 주문 상태 열거형
class OrderStatus(str, enum.Enum):
"""주문 상태 정의"""
PENDING = "pending" # 주문 대기
CONFIRMED = "confirmed" # 주문 확인
PAID = "paid" # 결제 완료
PROCESSING = "processing" # 주문 처리 중
SHIPPED = "shipped" # 배송 중
DELIVERED = "delivered" # 배송 완료
CANCELLED = "cancelled" # 주문 취소
REFUNDED = "refunded" # 환불 완료
class PaymentStatus(str, enum.Enum):
"""결제 상태 정의"""
PENDING = "pending" # 결제 대기
PROCESSING = "processing" # 결제 처리 중
COMPLETED = "completed" # 결제 완료
FAILED = "failed" # 결제 실패
CANCELLED = "cancelled" # 결제 취소
REFUNDED = "refunded" # 환불 완료
# SQLAlchemy 모델 정의
class OrderModel(Base):
"""주문 정보 데이터베이스 모델"""
__tablename__ = "orders"
# 기본 필드
id = Column(Integer, primary_key=True, index=True, comment="주문 ID")
order_number = Column(String(50), unique=True, nullable=False, index=True, comment="주문 번호")
user_id = Column(Integer, nullable=False, index=True, comment="사용자 ID")
# 주문 상세 정보
product_id = Column(Integer, nullable=False, comment="상품 ID")
product_name = Column(String(255), nullable=False, comment="상품명")
product_price = Column(Float, nullable=False, comment="상품 단가")
quantity = Column(Integer, nullable=False, comment="주문 수량")
total_amount = Column(Float, nullable=False, comment="총 주문 금액")
# 상태 및 메타데이터
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING, nullable=False, comment="주문 상태")
payment_status = Column(Enum(PaymentStatus), default=PaymentStatus.PENDING, nullable=False, comment="결제 상태")
# 주소 정보
shipping_address = Column(String(500), comment="배송 주소")
billing_address = Column(String(500), comment="청구지 주소")
# 추가 정보
notes = Column(String(1000), comment="주문 메모")
metadata = Column(String(2000), comment="추가 메타데이터 (JSON)")
# 타임스탬프
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, comment="생성 시간")
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False, comment="수정 시간")
confirmed_at = Column(DateTime, comment="주문 확인 시간")
shipped_at = Column(DateTime, comment="배송 시작 시간")
delivered_at = Column(DateTime, comment="배송 완료 시간")
cancelled_at = Column(DateTime, comment="취소 시간")
# 인덱스 정의 (쿼리 성능 최적화)
__table_args__ = (
Index('idx_user_created', 'user_id', 'created_at'),
Index('idx_status_created', 'status', 'created_at'),
Index('idx_payment_status', 'payment_status'),
)
class OrderItemModel(Base):
"""주문 아이템 상세 정보 모델 (다중 상품 주문 지원)"""
__tablename__ = "order_items"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(Integer, ForeignKey('orders.id'), nullable=False, comment="주문 ID")
product_id = Column(Integer, nullable=False, comment="상품 ID")
product_name = Column(String(255), nullable=False, comment="상품명")
product_sku = Column(String(100), comment="상품 SKU")
unit_price = Column(Float, nullable=False, comment="단가")
quantity = Column(Integer, nullable=False, comment="수량")
subtotal = Column(Float, nullable=False, comment="소계")
# 관계 정의
order = relationship("OrderModel", backref="items")
# Pydantic 스키마 정의 (API 요청/응답)
class OrderCreateRequest(BaseModel):
"""주문 생성 요청 스키마"""
user_id: int = Field(..., gt=0, description="사용자 ID")
product_id: int = Field(..., gt=0, description="상품 ID")
quantity: int = Field(..., gt=0, le=1000, description="주문 수량")
shipping_address: Optional[str] = Field(None, max_length=500, description="배송 주소")
billing_address: Optional[str] = Field(None, max_length=500, description="청구지 주소")
notes: Optional[str] = Field(None, max_length=1000, description="주문 메모")
@validator('quantity')
def validate_quantity(cls, v):
"""수량 유효성 검증"""
if v <= 0:
raise ValueError('수량은 0보다 커야 합니다')
if v > 1000:
raise ValueError('수량은 1000개를 초과할 수 없습니다')
return v
class OrderItemCreate(BaseModel):
"""주문 아이템 생성 스키마"""
product_id: int = Field(..., gt=0, description="상품 ID")
quantity: int = Field(..., gt=0, description="수량")
class MultiOrderCreateRequest(BaseModel):
"""다중 상품 주문 생성 요청 스키마"""
user_id: int = Field(..., gt=0, description="사용자 ID")
items: List[OrderItemCreate] = Field(..., min_items=1, max_items=50, description="주문 아이템 목록")
shipping_address: Optional[str] = Field(None, max_length=500, description="배송 주소")
billing_address: Optional[str] = Field(None, max_length=500, description="청구지 주소")
notes: Optional[str] = Field(None, max_length=1000, description="주문 메모")
class OrderStatusUpdateRequest(BaseModel):
"""주문 상태 업데이트 요청 스키마"""
status: OrderStatus = Field(..., description="변경할 주문 상태")
notes: Optional[str] = Field(None, max_length=500, description="상태 변경 사유")
class PaymentStatusUpdateRequest(BaseModel):
"""결제 상태 업데이트 요청 스키마"""
payment_status: PaymentStatus = Field(..., description="변경할 결제 상태")
transaction_id: Optional[str] = Field(None, description="결제 거래 ID")
class OrderResponse(BaseModel):
"""주문 응답 스키마"""
id: int
order_number: str
user_id: int
product_id: int
product_name: str
product_price: float
quantity: int
total_amount: float
status: OrderStatus
payment_status: PaymentStatus
shipping_address: Optional[str]
billing_address: Optional[str]
notes: Optional[str]
created_at: datetime
updated_at: datetime
confirmed_at: Optional[datetime]
shipped_at: Optional[datetime]
delivered_at: Optional[datetime]
cancelled_at: Optional[datetime]
class Config:
from_attributes = True
class OrderListResponse(BaseModel):
"""주문 목록 응답 스키마"""
orders: List[OrderResponse]
total_count: int
page: int
page_size: int
has_next: bool
has_previous: bool
class OrderSummaryResponse(BaseModel):
"""주문 요약 응답 스키마"""
total_orders: int
pending_orders: int
completed_orders: int
cancelled_orders: int
total_revenue: float
average_order_value: float
|
캐시 레이어 (Cache Layer)#
cache.py#
Redis 를 활용한 캐시 관리 시스템. 주문 정보 캐싱, 분산 락 구현, 세션 관리 기능을 제공한다. Cache-Aside 패턴을 구현하여 데이터베이스 부하를 줄이고 응답 시간을 개선한다. 분산 락은 동시성 제어를 위해 Lua 스크립트를 사용하여 원자적 연산을 보장한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
| # Redis 캐시 관리 모듈
# 주문 정보 캐싱, 세션 관리, 분산 락 구현
import redis
import json
import asyncio
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from config import settings
from utils import logger, log_execution_time
class CacheManager:
"""Redis 캐시 관리 클래스"""
def __init__(self):
self.redis_client = None
self._initialize_client()
def _initialize_client(self):
"""Redis 클라이언트 초기화"""
try:
self.redis_client = redis.from_url(
settings.redis.url,
decode_responses=settings.redis.decode_responses,
socket_timeout=settings.redis.socket_timeout,
socket_connect_timeout=settings.redis.socket_connect_timeout,
retry_on_timeout=True,
health_check_interval=30
)
# 연결 테스트
self.redis_client.ping()
logger.info("Redis 클라이언트 초기화 완료")
except Exception as e:
logger.error("Redis 연결 실패", error=str(e))
raise
class OrderCacheService:
"""주문 정보 캐시 서비스"""
def __init__(self, cache_manager: CacheManager):
self.cache = cache_manager.redis_client
self.default_ttl = settings.redis.default_ttl
def _get_order_key(self, order_id: int) -> str:
"""주문 캐시 키 생성"""
return f"order:{order_id}"
def _get_user_orders_key(self, user_id: int) -> str:
"""사용자 주문 목록 캐시 키 생성"""
return f"user_orders:{user_id}"
@log_execution_time("cache_order_set")
async def set_order(self, order_id: int, order_data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""주문 정보 캐시 저장"""
try:
key = self._get_order_key(order_id)
ttl = ttl or self.default_ttl
# 데이터 직렬화 (datetime 객체 처리)
serialized_data = json.dumps(order_data, default=str, ensure_ascii=False)
result = self.cache.setex(key, ttl, serialized_data)
if result:
logger.info("주문 캐시 저장 완료", order_id=order_id, ttl=ttl)
return result
except Exception as e:
logger.error("주문 캐시 저장 실패", order_id=order_id, error=str(e))
return False
@log_execution_time("cache_order_get")
async def get_order(self, order_id: int) -> Optional[Dict[str, Any]]:
"""주문 정보 캐시 조회"""
try:
key = self._get_order_key(order_id)
cached_data = self.cache.get(key)
if cached_data:
logger.info("캐시에서 주문 조회 성공", order_id=order_id)
return json.loads(cached_data)
logger.debug("캐시에서 주문 정보 없음", order_id=order_id)
return None
except Exception as e:
logger.error("주문 캐시 조회 실패", order_id=order_id, error=str(e))
return None
@log_execution_time("cache_order_delete")
async def delete_order(self, order_id: int) -> bool:
"""주문 정보 캐시 삭제"""
try:
key = self._get_order_key(order_id)
result = self.cache.delete(key)
if result:
logger.info("주문 캐시 삭제 완료", order_id=order_id)
return bool(result)
except Exception as e:
logger.error("주문 캐시 삭제 실패", order_id=order_id, error=str(e))
return False
async def set_user_orders(self, user_id: int, order_list: List[Dict[str, Any]], ttl: Optional[int] = None) -> bool:
"""사용자 주문 목록 캐시 저장"""
try:
key = self._get_user_orders_key(user_id)
ttl = ttl or self.default_ttl
serialized_data = json.dumps(order_list, default=str, ensure_ascii=False)
result = self.cache.setex(key, ttl, serialized_data)
if result:
logger.info("사용자 주문 목록 캐시 저장 완료", user_id=user_id, count=len(order_list))
return result
except Exception as e:
logger.error("사용자 주문 목록 캐시 저장 실패", user_id=user_id, error=str(e))
return False
async def get_user_orders(self, user_id: int) -> Optional[List[Dict[str, Any]]]:
"""사용자 주문 목록 캐시 조회"""
try:
key = self._get_user_orders_key(user_id)
cached_data = self.cache.get(key)
if cached_data:
logger.info("캐시에서 사용자 주문 목록 조회 성공", user_id=user_id)
return json.loads(cached_data)
return None
except Exception as e:
logger.error("사용자 주문 목록 캐시 조회 실패", user_id=user_id, error=str(e))
return None
class DistributedLockService:
"""분산 락 서비스 (동시성 제어)"""
def __init__(self, cache_manager: CacheManager):
self.cache = cache_manager.redis_client
async def acquire_lock(self, resource_id: str, timeout: int = 30, blocking_timeout: int = 10) -> Optional[str]:
"""분산 락 획득"""
lock_key = f"lock:{resource_id}"
lock_value = f"{datetime.utcnow().isoformat()}:{resource_id}"
try:
# 논블로킹 락 시도
acquired = self.cache.set(lock_key, lock_value, nx=True, ex=timeout)
if acquired:
logger.info("분산 락 획득 성공", resource_id=resource_id, lock_value=lock_value)
return lock_value
# 블로킹 락 시도
start_time = datetime.utcnow()
while (datetime.utcnow() - start_time).total_seconds() < blocking_timeout:
await asyncio.sleep(0.1)
acquired = self.cache.set(lock_key, lock_value, nx=True, ex=timeout)
if acquired:
logger.info("분산 락 획득 성공 (블로킹)", resource_id=resource_id)
return lock_value
logger.warning("분산 락 획득 타임아웃", resource_id=resource_id)
return None
except Exception as e:
logger.error("분산 락 획득 실패", resource_id=resource_id, error=str(e))
return None
async def release_lock(self, resource_id: str, lock_value: str) -> bool:
"""분산 락 해제"""
lock_key = f"lock:{resource_id}"
try:
# Lua 스크립트를 사용한 원자적 락 해제
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.cache.eval(lua_script, 1, lock_key, lock_value)
if result:
logger.info("분산 락 해제 완료", resource_id=resource_id)
else:
logger.warning("분산 락 해제 실패 - 잘못된 락 값", resource_id=resource_id)
return bool(result)
except Exception as e:
logger.error("분산 락 해제 실패", resource_id=resource_id, error=str(e))
return False
class SessionCacheService:
"""세션 캐시 서비스"""
def __init__(self, cache_manager: CacheManager):
self.cache = cache_manager.redis_client
self.session_ttl = 3600 # 1시간
async def set_session(self, session_id: str, session_data: Dict[str, Any]) -> bool:
"""세션 데이터 저장"""
try:
key = f"session:{session_id}"
serialized_data = json.dumps(session_data, default=str)
result = self.cache.setex(key, self.session_ttl, serialized_data)
if result:
logger.info("세션 저장 완료", session_id=session_id)
return result
except Exception as e:
logger.error("세션 저장 실패", session_id=session_id, error=str(e))
return False
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""세션 데이터 조회"""
try:
key = f"session:{session_id}"
cached_data = self.cache.get(key)
if cached_data:
# TTL 갱신
self.cache.expire(key, self.session_ttl)
return json.loads(cached_data)
return None
except Exception as e:
logger.error("세션 조회 실패", session_id=session_id, error=str(e))
return None
# 전역 캐시 서비스 인스턴스
cache_manager = CacheManager()
order_cache = OrderCacheService(cache_manager)
distributed_lock = DistributedLockService(cache_manager)
session_cache = SessionCacheService(cache_manager)
async def health_check_redis() -> bool:
"""Redis 헬스 체크"""
try:
cache_manager.redis_client.ping()
return True
except Exception as e:
logger.error("Redis 헬스 체크 실패", error=str(e))
return False
|
메시징 레이어 (Messaging Layer)#
messaging.py#
Kafka 를 통한 비동기 이벤트 기반 아키텍처를 구현한다. 이벤트 발행자 (Publisher) 와 구독자 (Consumer) 를 제공하며, 주문 생성, 상태 변경, 결제 완료 등의 도메인 이벤트를 처리한다. 재시도 로직, 멱등성 보장, 이벤트 순서 보장 등의 기능을 포함한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
| # Kafka 이벤트 메시징 모듈
# 비동기 이벤트 발행 및 구독을 통한 마이크로서비스 간 통신
import json
import asyncio
from typing import Dict, Any, Optional, Callable
from datetime import datetime
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.errors import KafkaError
from config import settings
from utils import logger, log_execution_time
class EventPublisher:
"""Kafka 이벤트 발행자"""
def __init__(self):
self.producer: Optional[AIOKafkaProducer] = None
self.is_started = False
async def start(self):
"""Kafka 프로듀서 시작"""
try:
self.producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka.bootstrap_servers,
value_serializer=self._serialize_value,
key_serializer=self._serialize_key,
retry_backoff_ms=settings.kafka.retry_backoff_ms,
request_timeout_ms=settings.kafka.request_timeout_ms,
acks='all', # 모든 리플리카에서 확인
enable_idempotence=True, # 멱등성 보장
max_in_flight_requests_per_connection=1, # 순서 보장
compression_type='gzip' # 압축
)
await self.producer.start()
self.is_started = True
logger.info("Kafka 프로듀서 시작 완료")
except Exception as e:
logger.error("Kafka 프로듀서 시작 실패", error=str(e))
raise
async def stop(self):
"""Kafka 프로듀서 종료"""
if self.producer and self.is_started:
await self.producer.stop()
self.is_started = False
logger.info("Kafka 프로듀서 종료 완료")
def _serialize_value(self, value: Any) -> bytes:
"""메시지 값 직렬화"""
return json.dumps(value, default=str, ensure_ascii=False).encode('utf-8')
def _serialize_key(self, key: str) -> bytes:
"""메시지 키 직렬화"""
return key.encode('utf-8')
@log_execution_time("kafka_publish_event")
async def publish_event(self, topic: str, event_type: str, data: Dict[str, Any], key: Optional[str] = None) -> bool:
"""이벤트 발행"""
if not self.is_started:
logger.error("Kafka 프로듀서가 시작되지 않음")
return False
try:
# 이벤트 메시지 구성
event_message = {
"event_id": f"{event_type}_{datetime.utcnow().isoformat()}_{id(data)}",
"event_type": event_type,
"event_version": "1.0",
"timestamp": datetime.utcnow().isoformat(),
"source": "order-service",
"data": data,
"metadata": {
"correlation_id": data.get("correlation_id"),
"trace_id": data.get("trace_id")
}
}
# 토픽에 메시지 전송
await self.producer.send(
topic=topic,
value=event_message,
key=key
)
# 버퍼 플러시 (즉시 전송 보장)
await self.producer.flush()
logger.info(
"이벤트 발행 완료",
topic=topic,
event_type=event_type,
event_id=event_message["event_id"],
key=key
)
return True
except KafkaError as e:
logger.error(
"Kafka 이벤트 발행 실패",
topic=topic,
event_type=event_type,
error=str(e)
)
return False
except Exception as e:
logger.error(
"이벤트 발행 중 예외 발생",
topic=topic,
event_type=event_type,
error=str(e)
)
return False
class EventConsumer:
"""Kafka 이벤트 구독자"""
def __init__(self, group_id: str):
self.group_id = group_id
self.consumer: Optional[AIOKafkaConsumer] = None
self.is_started = False
self.event_handlers: Dict[str, Callable] = {}
def register_handler(self, event_type: str, handler: Callable):
"""이벤트 핸들러 등록"""
self.event_handlers[event_type] = handler
logger.info("이벤트 핸들러 등록", event_type=event_type, handler=handler.__name__)
async def start(self, topics: list):
"""Kafka 컨슈머 시작"""
try:
self.consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=settings.kafka.bootstrap_servers,
group_id=self.group_id,
value_deserializer=self._deserialize_value,
key_deserializer=self._deserialize_key,
enable_auto_commit=False, # 수동 커밋
auto_offset_reset='earliest',
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
await self.consumer.start()
self.is_started = True
logger.info("Kafka 컨슈머 시작 완료", group_id=self.group_id, topics=topics)
except Exception as e:
logger.error("Kafka 컨슈머 시작 실패", group_id=self.group_id, error=str(e))
raise
async def stop(self):
"""Kafka 컨슈머 종료"""
if self.consumer and self.is_started:
await self.consumer.stop()
self.is_started = False
logger.info("Kafka 컨슈머 종료 완료", group_id=self.group_id)
def _deserialize_value(self, value: bytes) -> Dict[str, Any]:
"""메시지 값 역직렬화"""
return json.loads(value.decode('utf-8'))
def _deserialize_key(self, key: bytes) -> str:
"""메시지 키 역직렬화"""
return key.decode('utf-8') if key else None
async def consume_events(self):
"""이벤트 소비 및 처리"""
if not self.is_started:
logger.error("Kafka 컨슈머가 시작되지 않음")
return
try:
async for message in self.consumer:
await self._process_message(message)
except Exception as e:
logger.error("이벤트 소비 중 예외 발생", group_id=self.group_id, error=str(e))
async def _process_message(self, message):
"""메시지 처리"""
try:
event = message.value
event_type = event.get("event_type")
logger.info(
"이벤트 수신",
topic=message.topic,
partition=message.partition,
offset=message.offset,
event_type=event_type,
event_id=event.get("event_id")
)
# 이벤트 핸들러 실행
if event_type in self.event_handlers:
handler = self.event_handlers[event_type]
await handler(event)
# 처리 완료 후 커밋
await self.consumer.commit()
logger.info(
"이벤트 처리 완료",
event_type=event_type,
handler=handler.__name__
)
else:
logger.warning("등록되지 않은 이벤트 타입", event_type=event_type)
# 처리할 수 없는 메시지도 커밋 (DLQ로 보내거나 로그만 남김)
await self.consumer.commit()
except Exception as e:
logger.error(
"메시지 처리 실패",
topic=message.topic,
offset=message.offset,
error=str(e)
)
# 에러 발생 시 커밋하지 않음 (재처리 대상)
class OrderEventService:
"""주문 관련 이벤트 서비스"""
def __init__(self, event_publisher: EventPublisher):
self.publisher = event_publisher
self.topic = f"{settings.kafka.topic_prefix}.{settings.kafka.order_events_topic}"
async def publish_order_created(self, order_data: Dict[str, Any]) -> bool:
"""주문 생성 이벤트 발행"""
return await self.publisher.publish_event(
topic=self.topic,
event_type="order_created",
data=order_data,
key=str(order_data.get("id"))
)
async def publish_order_updated(self, order_data: Dict[str, Any]) -> bool:
"""주문 업데이트 이벤트 발행"""
return await self.publisher.publish_event(
topic=self.topic,
event_type="order_updated",
data=order_data,
key=str(order_data.get("id"))
)
async def publish_order_status_changed(self, order_data: Dict[str, Any]) -> bool:
"""주문 상태 변경 이벤트 발행"""
return await self.publisher.publish_event(
topic=self.topic,
event_type="order_status_changed",
data=order_data,
key=str(order_data.get("id"))
)
async def publish_order_cancelled(self, order_data: Dict[str, Any]) -> bool:
"""주문 취소 이벤트 발행"""
return await self.publisher.publish_event(
topic=self.topic,
event_type="order_cancelled",
data=order_data,
key=str(order_data.get("id"))
)
async def publish_payment_completed(self, payment_data: Dict[str, Any]) -> bool:
"""결제 완료 이벤트 발행"""
return await self.publisher.publish_event(
topic=self.topic,
event_type="payment_completed",
data=payment_data,
key=str(payment_data.get("order_id"))
)
# 전역 이벤트 서비스 인스턴스
event_publisher = EventPublisher()
order_event_service = OrderEventService(event_publisher)
# 이벤트 핸들러 예시
async def handle_payment_completed(event: Dict[str, Any]):
"""결제 완료 이벤트 핸들러"""
try:
payment_data = event["data"]
order_id = payment_data.get("order_id")
logger.info("결제 완료 이벤트 처리 시작", order_id=order_id)
# 주문 상태를 '결제 완료'로 업데이트하는 로직
# (실제 구현에서는 서비스 레이어 호출)
logger.info("결제 완료 이벤트 처리 완료", order_id=order_id)
except Exception as e:
logger.error("결제 완료 이벤트 처리 실패", error=str(e))
raise
async def handle_inventory_updated(event: Dict[str, Any]):
"""재고 업데이트 이벤트 핸들러"""
try:
inventory_data = event["data"]
product_id = inventory_data.get("product_id")
available_quantity = inventory_data.get("available_quantity")
logger.info("재고 업데이트 이벤트 처리", product_id=product_id, quantity=available_quantity)
# 재고 부족 시 대기 중인 주문 처리 로직
# (실제 구현에서는 서비스 레이어 호출)
except Exception as e:
logger.error("재고 업데이트 이벤트 처리 실패", error=str(e))
raise
# 이벤트 컨슈머 설정 (백그라운드 태스크로 실행)
async def setup_event_consumers():
"""이벤트 컨슈머 설정 및 시작"""
# 주문 서비스 컨슈머
order_consumer = EventConsumer("order-service-group")
order_consumer.register_handler("payment_completed", handle_payment_completed)
order_consumer.register_handler("inventory_updated", handle_inventory_updated)
# 컨슈머 시작
await order_consumer.start([
f"{settings.kafka.topic_prefix}.payment-events",
f"{settings.kafka.topic_prefix}.inventory-events"
])
# 백그라운드에서 이벤트 소비
asyncio.create_task(order_consumer.consume_events())
return order_consumer
|
서비스 레이어 (Service Layer)#
services.py#
핵심 비즈니스 로직을 담당하는 서비스 계층. 외부 서비스 통신, 주문 생성/조회/업데이트, 상태 전환 검증, 트랜잭션 관리 등을 처리한다. 분산 락을 사용한 동시성 제어, 재고 예약, 결제 연동 등의 복잡한 비즈니스 프로세스를 구현한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
| # 비즈니스 로직 서비스 레이어 모듈
# 주문 처리, 외부 서비스 통신, 트랜잭션 관리를 담당
import asyncio
import httpx
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc, func
import uuid
from models import (
OrderModel, OrderCreateRequest, OrderResponse, OrderStatus,
PaymentStatus, OrderStatusUpdateRequest, MultiOrderCreateRequest
)
from database import db_manager
from cache import order_cache, distributed_lock
from messaging import order_event_service
from config import settings
from utils import logger, log_execution_time, metrics
class ExternalServiceClient:
"""외부 서비스 통신 클라이언트"""
def __init__(self):
self.timeout = httpx.Timeout(settings.external_services.http_timeout)
self.retry_attempts = settings.external_services.retry_attempts
async def _make_request(self, method: str, url: str, **kwargs) -> Optional[httpx.Response]:
"""HTTP 요청 실행 (재시도 로직 포함)"""
for attempt in range(self.retry_attempts):
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
if e.response.status_code < 500: # 4xx 에러는 재시도하지 않음
logger.error(
"클라이언트 에러 - 재시도 안함",
url=url,
status_code=e.response.status_code,
attempt=attempt + 1
)
return None
if attempt == self.retry_attempts - 1:
logger.error("최대 재시도 횟수 초과", url=url, attempt=attempt + 1)
return None
# 지수 백오프 재시도
wait_time = 2 ** attempt
logger.warning(
"서버 에러 - 재시도 예정",
url=url,
status_code=e.response.status_code,
attempt=attempt + 1,
wait_time=wait_time
)
await asyncio.sleep(wait_time)
except Exception as e:
if attempt == self.retry_attempts - 1:
logger.error("HTTP 요청 실패", url=url, error=str(e), attempt=attempt + 1)
return None
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
return None
class UserService:
"""사용자 서비스 클라이언트"""
def __init__(self, client: ExternalServiceClient):
self.client = client
self.base_url = settings.external_services.user_service_url
@log_execution_time("user_service_verify")
async def verify_user(self, user_id: int) -> bool:
"""사용자 존재 여부 검증"""
url = f"{self.base_url}/users/{user_id}"
response = await self.client._make_request("GET", url)
if response and response.status_code == 200:
logger.info("사용자 검증 성공", user_id=user_id)
return True
logger.warning("사용자 검증 실패", user_id=user_id)
return False
async def get_user_info(self, user_id: int) -> Optional[Dict[str, Any]]:
"""사용자 정보 조회"""
url = f"{self.base_url}/users/{user_id}"
response = await self.client._make_request("GET", url)
if response and response.status_code == 200:
return response.json()
return None
class ProductService:
"""상품 서비스 클라이언트"""
def __init__(self, client: ExternalServiceClient):
self.client = client
self.base_url = settings.external_services.product_service_url
@log_execution_time("product_service_get_price")
async def get_product_info(self, product_id: int) -> Optional[Dict[str, Any]]:
"""상품 정보 조회"""
url = f"{self.base_url}/products/{product_id}"
response = await self.client._make_request("GET", url)
if response and response.status_code == 200:
product_info = response.json()
logger.info("상품 정보 조회 성공", product_id=product_id, price=product_info.get("price"))
return product_info
logger.warning("상품 정보 조회 실패", product_id=product_id)
return None
async def check_inventory(self, product_id: int, quantity: int) -> bool:
"""재고 확인"""
url = f"{self.base_url}/products/{product_id}/inventory"
response = await self.client._make_request("GET", url)
if response and response.status_code == 200:
inventory_info = response.json()
available_quantity = inventory_info.get("available_quantity", 0)
return available_quantity >= quantity
return False
async def reserve_inventory(self, product_id: int, quantity: int, reservation_id: str) -> bool:
"""재고 예약"""
url = f"{self.base_url}/products/{product_id}/reserve"
payload = {
"quantity": quantity,
"reservation_id": reservation_id,
"expires_at": (datetime.utcnow() + timedelta(minutes=15)).isoformat()
}
response = await self.client._make_request("POST", url, json=payload)
return response and response.status_code == 200
class PaymentService:
"""결제 서비스 클라이언트"""
def __init__(self, client: ExternalServiceClient):
self.client = client
self.base_url = settings.external_services.payment_service_url
async def create_payment(self, order_id: int, amount: float, user_id: int) -> Optional[str]:
"""결제 생성"""
url = f"{self.base_url}/payments"
payload = {
"order_id": order_id,
"amount": amount,
"user_id": user_id,
"currency": "KRW"
}
response = await self.client._make_request("POST", url, json=payload)
if response and response.status_code == 201:
payment_info = response.json()
return payment_info.get("payment_id")
return None
class OrderService:
"""주문 처리 핵심 서비스"""
def __init__(self):
self.external_client = ExternalServiceClient()
self.user_service = UserService(self.external_client)
self.product_service = ProductService(self.external_client)
self.payment_service = PaymentService(self.external_client)
def _generate_order_number(self) -> str:
"""주문 번호 생성"""
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
unique_id = str(uuid.uuid4()).replace("-", "")[:8].upper()
return f"ORD{timestamp}{unique_id}"
@log_execution_time("order_creation")
async def create_order(self, order_request: OrderCreateRequest) -> OrderResponse:
"""새 주문 생성 (단일 상품)"""
# 분산 락을 사용한 동시성 제어
lock_resource = f"order_creation_user_{order_request.user_id}"
lock_value = await distributed_lock.acquire_lock(lock_resource, timeout=30)
if not lock_value:
raise Exception("주문 생성 중 동시성 제어 실패 - 잠시 후 다시 시도해주세요")
try:
# 1. 사용자 검증
user_exists = await self.user_service.verify_user(order_request.user_id)
if not user_exists:
raise Exception("존재하지 않는 사용자입니다")
# 2. 상품 정보 조회
product_info = await self.product_service.get_product_info(order_request.product_id)
if not product_info:
raise Exception("존재하지 않는 상품입니다")
# 3. 재고 확인
inventory_available = await self.product_service.check_inventory(
order_request.product_id,
order_request.quantity
)
if not inventory_available:
raise Exception("재고가 부족합니다")
# 4. 총 금액 계산
product_price = product_info["price"]
total_amount = product_price * order_request.quantity
# 5. 재고 예약
reservation_id = str(uuid.uuid4())
reservation_success = await self.product_service.reserve_inventory(
order_request.product_id,
order_request.quantity,
reservation_id
)
if not reservation_success:
raise Exception("재고 예약에 실패했습니다")
# 6. 데이터베이스 트랜잭션으로 주문 생성
with db_manager.get_session_context() as db:
order_number = self._generate_order_number()
db_order = OrderModel(
order_number=order_number,
user_id=order_request.user_id,
product_id=order_request.product_id,
product_name=product_info["name"],
product_price=product_price,
quantity=order_request.quantity,
total_amount=total_amount,
status=OrderStatus.PENDING,
payment_status=PaymentStatus.PENDING,
shipping_address=order_request.shipping_address,
billing_address=order_request.billing_address,
notes=order_request.notes,
metadata=json.dumps({"reservation_id": reservation_id})
)
db.add(db_order)
db.flush() # ID 생성을 위한 플러시
# 7. 결제 생성
payment_id = await self.payment_service.create_payment(
db_order.id,
total_amount,
order_request.user_id
)
if payment_id:
# 결제 ID를 메타데이터에 추가
metadata = json.loads(db_order.metadata or "{}")
metadata["payment_id"] = payment_id
db_order.metadata = json.dumps(metadata)
db.commit()
db.refresh(db_order)
# 8. 응답 데이터 생성
order_response = OrderResponse.from_orm(db_order)
# 9. 캐시 저장
await order_cache.set_order(db_order.id, order_response.dict())
# 10. 이벤트 발행
await order_event_service.publish_order_created({
"id": db_order.id,
"order_number": db_order.order_number,
"user_id": db_order.user_id,
"product_id": db_order.product_id,
"quantity": db_order.quantity,
"total_amount": db_order.total_amount,
"status": db_order.status,
"payment_id": payment_id,
"reservation_id": reservation_id,
"created_at": db_order.created_at.isoformat()
})
# 11. 메트릭 기록
metrics.increment_counter("orders_created_total")
metrics.increment_counter("orders_by_status", {"status": OrderStatus.PENDING})
logger.info(
"주문 생성 완료",
order_id=db_order.id,
order_number=order_number,
user_id=order_request.user_id,
total_amount=total_amount
)
return order_response
finally:
# 분산 락 해제
await distributed_lock.release_lock(lock_resource, lock_value)
@log_execution_time("order_retrieval")
async def get_order(self, order_id: int) -> Optional[OrderResponse]:
"""주문 조회 (캐시 우선)"""
# 1. 캐시에서 먼저 조회
cached_order = await order_cache.get_order(order_id)
if cached_order:
metrics.increment_counter("cache_hits", {"type": "order"})
return OrderResponse(**cached_order)
# 2. 데이터베이스에서 조회
with db_manager.get_session_context() as db:
db_order = db.query(OrderModel).filter(OrderModel.id == order_id).first()
if not db_order:
return None
order_response = OrderResponse.from_orm(db_order)
# 3. 캐시에 저장
await order_cache.set_order(order_id, order_response.dict())
metrics.increment_counter("cache_misses", {"type": "order"})
metrics.increment_counter("orders_retrieved_total")
return order_response
@log_execution_time("order_status_update")
async def update_order_status(self, order_id: int, status_update: OrderStatusUpdateRequest) -> OrderResponse:
"""주문 상태 업데이트"""
with db_manager.get_session_context() as db:
db_order = db.query(OrderModel).filter(OrderModel.id == order_id).first()
if not db_order:
raise Exception("주문을 찾을 수 없습니다")
# 상태 변경 검증 로직
old_status = db_order.status
new_status = status_update.status
if not self._is_valid_status_transition(old_status, new_status):
raise Exception(f"잘못된 상태 전환: {old_status} -> {new_status}")
# 상태 업데이트
db_order.status = new_status
db_order.updated_at = datetime.utcnow()
# 상태별 타임스탬프 업데이트
if new_status == OrderStatus.CONFIRMED:
db_order.confirmed_at = datetime.utcnow()
elif new_status == OrderStatus.SHIPPED:
db_order.shipped_at = datetime.utcnow()
elif new_status == OrderStatus.DELIVERED:
db_order.delivered_at = datetime.utcnow()
elif new_status == OrderStatus.CANCELLED:
db_order.cancelled_at = datetime.utcnow()
if status_update.notes:
existing_notes = db_order.notes or ""
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
db_order.notes = f"{existing_notes}\n[{timestamp}] {status_update.notes}".strip()
db.commit()
db.refresh(db_order)
# 캐시 무효화
await order_cache.delete_order(order_id)
# 응답 생성
order_response = OrderResponse.from_orm(db_order)
# 이벤트 발행
await order_event_service.publish_order_status_changed({
"id": db_order.id,
"order_number": db_order.order_number,
"old_status": old_status,
"new_status": new_status,
"updated_at": db_order.updated_at.isoformat(),
"notes": status_update.notes
})
# 메트릭 업데이트
metrics.increment_counter("order_status_updates_total")
metrics.increment_counter("orders_by_status", {"status": new_status})
logger.info(
"주문 상태 업데이트 완료",
order_id=order_id,
old_status=old_status,
new_status=new_status
)
return order_response
def _is_valid_status_transition(self, old_status: OrderStatus, new_status: OrderStatus) -> bool:
"""주문 상태 전환 유효성 검증"""
valid_transitions = {
OrderStatus.PENDING: [OrderStatus.CONFIRMED, OrderStatus.CANCELLED],
OrderStatus.CONFIRMED: [OrderStatus.PAID, OrderStatus.CANCELLED],
OrderStatus.PAID: [OrderStatus.PROCESSING, OrderStatus.CANCELLED],
OrderStatus.PROCESSING: [OrderStatus.SHIPPED, OrderStatus.CANCELLED],
OrderStatus.SHIPPED: [OrderStatus.DELIVERED],
OrderStatus.DELIVERED: [OrderStatus.REFUNDED],
OrderStatus.CANCELLED: [],
OrderStatus.REFUNDED: []
}
return new_status in valid_transitions.get(old_status, [])
async def list_orders(
self,
user_id: Optional[int] = None,
status: Optional[OrderStatus] = None,
limit: int = 10,
offset: int = 0
) -> Tuple[List[OrderResponse], int]:
"""주문 목록 조회 (필터링 및 페이징)"""
with db_manager.get_session_context() as db:
# 기본 쿼리
query = db.query(OrderModel)
count_query = db.query(func.count(OrderModel.id))
# 필터 적용
if user_id:
query = query.filter(OrderModel.user_id == user_id)
count_query = count_query.filter(OrderModel.user_id == user_id)
if status:
query = query.filter(OrderModel.status == status)
count_query = count_query.filter(OrderModel.status == status)
# 총 개수 조회
total_count = count_query.scalar()
# 페이징 및 정렬
orders = (
query
.order_by(desc(OrderModel.created_at))
.offset(offset)
.limit(limit)
.all()
)
order_responses = [OrderResponse.from_orm(order) for order in orders]
metrics.increment_counter("order_list_queries_total")
return order_responses, total_count
async def cancel_order(self, order_id: int, reason: str) -> OrderResponse:
"""주문 취소"""
with db_manager.get_session_context() as db:
db_order = db.query(OrderModel).filter(OrderModel.id == order_id).first()
if not db_order:
raise Exception("주문을 찾을 수 없습니다")
if db_order.status in [OrderStatus.CANCELLED, OrderStatus.DELIVERED, OrderStatus.REFUNDED]:
raise Exception("취소할 수 없는 주문 상태입니다")
# 주문 취소 처리
db_order.status = OrderStatus.CANCELLED
db_order.cancelled_at = datetime.utcnow()
db_order.updated_at = datetime.utcnow()
# 취소 사유 추가
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
cancel_note = f"[{timestamp}] 주문 취소: {reason}"
existing_notes = db_order.notes or ""
db_order.notes = f"{existing_notes}\n{cancel_note}".strip()
db.commit()
db.refresh(db_order)
# 캐시 무효화
await order_cache.delete_order(order_id)
# 재고 해제 및 결제 취소를 위한 이벤트 발행
await order_event_service.publish_order_cancelled({
"id": db_order.id,
"order_number": db_order.order_number,
"user_id": db_order.user_id,
"product_id": db_order.product_id,
"quantity": db_order.quantity,
"reason": reason,
"cancelled_at": db_order.cancelled_at.isoformat()
})
order_response = OrderResponse.from_orm(db_order)
logger.info("주문 취소 완료", order_id=order_id, reason=reason)
return order_response
# 전역 서비스 인스턴스
order_service = OrderService()
|
API 레이어 (API Layer)#
main.py#
FastAPI 기반의 REST API 엔드포인트를 정의한다. HTTP 미들웨어, 에러 핸들링, 의존성 주입, 헬스 체크, 메트릭 수집 등의 기능을 포함한다. OpenAPI 문서 자동 생성, CORS 설정, 보안 미들웨어 등 웹 애플리케이션에 필요한 모든 기능을 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
| # FastAPI 메인 애플리케이션 모듈
# HTTP API 엔드포인트, 미들웨어, 의존성 주입 설정
from fastapi import FastAPI, HTTPException, Depends, status, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import datetime
import time
import traceback
from config import settings
from database import db_manager, get_db
from models import (
OrderCreateRequest, OrderResponse, OrderStatusUpdateRequest,
OrderListResponse, OrderSummaryResponse, OrderStatus
)
from services import order_service
from cache import health_check_redis
from messaging import event_publisher, setup_event_consumers
from utils import logger, metrics, health_checker
# FastAPI 애플리케이션 생성
app = FastAPI(
title=settings.app.app_name,
description="클라우드 네이티브 주문 처리 마이크로서비스 - 확장 가능하고 안정적인 전자상거래 주문 관리",
version=settings.app.app_version,
docs_url=settings.app.docs_url if not settings.is_production else None,
redoc_url=settings.app.redoc_url if not settings.is_production else None,
openapi_url="/openapi.json" if not settings.is_production else None
)
# 미들웨어 설정
@app.middleware("http")
async def request_middleware(request: Request, call_next):
"""요청 처리 미들웨어 (로깅, 메트릭, 성능 측정)"""
start_time = time.time()
request_id = request.headers.get("X-Request-ID", f"req_{int(time.time() * 1000)}")
# 요청 로깅
logger.info(
"HTTP 요청 시작",
method=request.method,
url=str(request.url),
request_id=request_id,
user_agent=request.headers.get("user-agent"),
remote_addr=request.client.host if request.client else None
)
try:
# 요청 처리
response = await call_next(request)
# 처리 시간 계산
process_time = time.time() - start_time
# 응답 헤더 추가
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(process_time)
# 메트릭 기록
metrics.record_histogram("http_request_duration", process_time, {
"method": request.method,
"status_code": str(response.status_code)
})
metrics.increment_counter("http_requests_total", {
"method": request.method,
"status_code": str(response.status_code)
})
# 응답 로깅
logger.info(
"HTTP 요청 완료",
method=request.method,
url=str(request.url),
status_code=response.status_code,
process_time=process_time,
request_id=request_id
)
return response
except Exception as e:
process_time = time.time() - start_time
# 에러 메트릭 기록
metrics.increment_counter("http_request_errors_total", {
"method": request.method,
"error_type": type(e).__name__
})
# 에러 로깅
logger.error(
"HTTP 요청 에러",
method=request.method,
url=str(request.url),
error=str(e),
process_time=process_time,
request_id=request_id,
traceback=traceback.format_exc()
)
raise
# CORS 미들웨어
app.add_middleware(
CORSMiddleware,
allow_origins=settings.app.cors_origins,
allow_credentials=True,
allow_methods=settings.app.cors_methods,
allow_headers=settings.app.cors_headers,
)
# 신뢰할 수 있는 호스트 미들웨어 (운영 환경)
if settings.is_production:
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*.example.com", "localhost"] # 실제 도메인으로 변경
)
# 애플리케이션 시작/종료 이벤트
@app.on_event("startup")
async def startup_event():
"""애플리케이션 시작 시 초기화"""
try:
# 데이터베이스 테이블 생성
db_manager.create_tables()
# Kafka 프로듀서 시작
await event_publisher.start()
# 이벤트 컨슈머 설정 (백그라운드 태스크)
await setup_event_consumers()
# 헬스 체크 등록
health_checker.register_check("database", db_manager.health_check)
health_checker.register_check("redis", health_check_redis)
logger.info(
"Order Service 시작 완료",
environment=settings.app.environment,
version=settings.app.app_version
)
except Exception as e:
logger.error("애플리케이션 시작 실패", error=str(e))
raise
@app.on_event("shutdown")
async def shutdown_event():
"""애플리케이션 종료 시 정리"""
try:
# Kafka 프로듀서 종료
await event_publisher.stop()
# 데이터베이스 연결 종료
db_manager.close()
logger.info("Order Service 종료 완료")
except Exception as e:
logger.error("애플리케이션 종료 중 에러", error=str(e))
# 에러 핸들러
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP 예외 처리"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.status_code,
"message": exc.detail,
"timestamp": datetime.utcnow().isoformat(),
"path": str(request.url)
}
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""요청 검증 예외 처리"""
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": {
"code": 422,
"message": "요청 데이터 검증 실패",
"details": exc.errors(),
"timestamp": datetime.utcnow().isoformat(),
"path": str(request.url)
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""일반 예외 처리"""
logger.error(
"예상치 못한 에러 발생",
error=str(exc),
path=str(request.url),
traceback=traceback.format_exc()
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": {
"code": 500,
"message": "내부 서버 오류가 발생했습니다",
"timestamp": datetime.utcnow().isoformat(),
"path": str(request.url)
}
}
)
# 헬스 체크 엔드포인트
@app.get("/health", tags=["Health"])
async def health_check():
"""헬스 체크 엔드포인트 (Kubernetes Liveness/Readiness Probe)"""
try:
health_results = await health_checker.run_checks()
if health_results["status"] == "healthy":
return health_results
else:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=health_results
)
except Exception as e:
logger.error("헬스 체크 실패", error=str(e))
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={"status": "unhealthy", "error": str(e)}
)
@app.get("/metrics", tags=["Monitoring"])
async def get_metrics():
"""메트릭 엔드포인트 (Prometheus 수집용)"""
return metrics.get_metrics()
@app.get("/info", tags=["Info"])
async def get_info():
"""서비스 정보 조회"""
return {
"service": settings.app.app_name,
"version": settings.app.app_version,
"environment": settings.app.environment,
"timestamp": datetime.utcnow().isoformat()
}
# 주문 API 엔드포인트
@app.post(
"/orders",
response_model=OrderResponse,
status_code=status.HTTP_201_CREATED,
tags=["Orders"],
summary="새 주문 생성",
description="사용자가 상품을 주문할 때 새로운 주문을 생성합니다. 사용자 검증, 상품 확인, 재고 체크를 포함합니다."
)
async def create_order(order_request: OrderCreateRequest):
"""새 주문 생성"""
try:
order_response = await order_service.create_order(order_request)
return order_response
except Exception as e:
logger.error("주문 생성 실패", error=str(e), user_id=order_request.user_id)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@app.get(
"/orders/{order_id}",
response_model=OrderResponse,
tags=["Orders"],
summary="주문 조회",
description="주문 ID로 특정 주문의 상세 정보를 조회합니다."
)
async def get_order(order_id: int):
"""주문 상세 조회"""
order_response = await order_service.get_order(order_id)
if not order_response:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="주문을 찾을 수 없습니다"
)
return order_response
@app.put(
"/orders/{order_id}/status",
response_model=OrderResponse,
tags=["Orders"],
summary="주문 상태 업데이트",
description="주문의 상태를 업데이트합니다. 상태 전환 규칙에 따라 유효성을 검증합니다."
)
async def update_order_status(order_id: int, status_update: OrderStatusUpdateRequest):
"""주문 상태 업데이트"""
try:
order_response = await order_service.update_order_status(order_id, status_update)
return order_response
except Exception as e:
logger.error("주문 상태 업데이트 실패", error=str(e), order_id=order_id)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@app.get(
"/orders",
response_model=OrderListResponse,
tags=["Orders"],
summary="주문 목록 조회",
description="필터링과 페이징을 지원하는 주문 목록을 조회합니다."
)
async def list_orders(
user_id: Optional[int] = None,
status: Optional[OrderStatus] = None,
page: int = 1,
page_size: int = 10
):
"""주문 목록 조회 (필터링 및 페이징)"""
if page < 1:
page = 1
if page_size < 1 or page_size > 100:
page_size = 10
offset = (page - 1) * page_size
try:
orders, total_count = await order_service.list_orders(
user_id=user_id,
status=status,
limit=page_size,
offset=offset
)
has_next = offset + page_size < total_count
has_previous = page > 1
return OrderListResponse(
orders=orders,
total_count=total_count,
page=page,
page_size=page_size,
has_next=has_next,
has_previous=has_previous
)
except Exception as e:
logger.error("주문 목록 조회 실패", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="주문 목록 조회 중 오류가 발생했습니다"
)
@app.delete(
"/orders/{order_id}",
response_model=OrderResponse,
tags=["Orders"],
summary="주문 취소",
description="주문을 취소합니다. 취소 가능한 상태의 주문만 취소할 수 있습니다."
)
async def cancel_order(order_id: int, reason: str = "사용자 요청"):
"""주문 취소"""
try:
order_response = await order_service.cancel_order(order_id, reason)
return order_response
except Exception as e:
logger.error("주문 취소 실패", error=str(e), order_id=order_id)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@app.get(
"/orders/summary",
response_model=OrderSummaryResponse,
tags=["Analytics"],
summary="주문 요약 통계",
description="전체 주문 통계 정보를 제공합니다."
)
async def get_order_summary():
"""주문 요약 통계"""
# 이 기능은 실제 구현에서는 별도의 분석 서비스나 읽기 전용 복제본을 사용하는 것이 좋습니다
return OrderSummaryResponse(
total_orders=0,
pending_orders=0,
completed_orders=0,
cancelled_orders=0,
total_revenue=0.0,
average_order_value=0.0
)
# 개발 환경에서만 사용하는 디버그 엔드포인트
if settings.is_development:
@app.get("/debug/cache/{order_id}", tags=["Debug"])
async def debug_get_cache(order_id: int):
"""캐시 디버그 조회 (개발 환경 전용)"""
from cache import order_cache
cached_data = await order_cache.get_order(order_id)
return {"cached_data": cached_data}
@app.delete("/debug/cache/{order_id}", tags=["Debug"])
async def debug_clear_cache(order_id: int):
"""캐시 디버그 삭제 (개발 환경 전용)"""
from cache import order_cache
result = await order_cache.delete_order(order_id)
return {"deleted": result}
if __name__ == "__main__":
import uvicorn
# 개발 환경에서는 uvicorn으로 직접 실행
# 운영 환경에서는 Gunicorn + Uvicorn 조합 사용 권장
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=settings.is_development,
log_level=settings.app.log_level.lower(),
access_log=True
)
|
쿠버네티스 매니페스트 파일들#
네임스페이스 및 리소스 관리#
k8s/namespace.yaml: 서비스들을 격리하고 리소스 쿼터를 설정하여 멀티테넌시를 지원한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| # 네임스페이스 정의 - 리소스 격리 및 관리
apiVersion: v1
kind: Namespace
metadata:
name: ecommerce
labels:
name: ecommerce
environment: production
managed-by: kubectl
annotations:
description: "전자상거래 마이크로서비스를 위한 네임스페이스"
---
# 리소스 쿼터 설정 - 네임스페이스별 리소스 제한
apiVersion: v1
kind: ResourceQuota
metadata:
name: ecommerce-quota
namespace: ecommerce
spec:
hard:
requests.cpu: "4" # 총 CPU 요청량 제한
requests.memory: "8Gi" # 총 메모리 요청량 제한
limits.cpu: "8" # 총 CPU 제한량
limits.memory: "16Gi" # 총 메모리 제한량
pods: "20" # 최대 Pod 수
services: "10" # 최대 Service 수
persistentvolumeclaims: "5" # 최대 PVC 수
|
Order Service 배포#
k8s/order-service/configmap.yaml: 환경별 설정 정보를 외부화하여 컨테이너 이미지 재빌드 없이 설정 변경이 가능하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| # Order Service 설정 정보 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: order-service-config
namespace: ecommerce
labels:
app: order-service
component: config
data:
# 데이터베이스 연결 설정
DATABASE_URL: "postgresql://orderuser@postgres-service:5432/orderdb"
DB_ECHO: "false"
# Redis 캐시 설정
REDIS_URL: "redis://redis-service:6379/0"
# Kafka 메시징 설정
KAFKA_SERVERS: "kafka-service:9092"
KAFKA_TOPIC_PREFIX: "ecommerce"
# 외부 서비스 URL 설정
USER_SERVICE_URL: "http://user-service:8080"
PRODUCT_SERVICE_URL: "http://product-service:8080"
PAYMENT_SERVICE_URL: "http://payment-service:8080"
# 애플리케이션 설정
ENVIRONMENT: "production"
LOG_LEVEL: "INFO"
DEBUG: "false"
# API 설정
API_PREFIX: "/api/v1"
# 타임아웃 및 재시도 설정
HTTP_TIMEOUT: "30"
RETRY_ATTEMPTS: "3"
CACHE_TTL: "300"
|
k8s/order-service/secret.yaml: 데이터베이스 비밀번호, API 키 등 민감한 정보를 Base64 로 인코딩하여 안전하게 저장한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| # Order Service 민감 정보 Secret
apiVersion: v1
kind: Secret
metadata:
name: order-service-secret
namespace: ecommerce
labels:
app: order-service
component: secret
type: Opaque
data:
# 데이터베이스 비밀번호 (base64 인코딩)
DB_PASSWORD: cGFzc3dvcmQxMjM= # password123
# Kafka 인증 정보
KAFKA_API_KEY: a2Fma2Ffa2V5XzEyMw== # kafka_key_123
KAFKA_API_SECRET: a2Fma2Ffc2VjcmV0XzEyMw== # kafka_secret_123
# 외부 서비스 API 키
USER_SERVICE_API_KEY: dXNlcl9hcGlfa2V5XzEyMw== # user_api_key_123
PRODUCT_SERVICE_API_KEY: cHJvZHVjdF9hcGlfa2V5XzEyMw== # product_api_key_123
PAYMENT_SERVICE_API_KEY: cGF5bWVudF9hcGlfa2V5XzEyMw== # payment_api_key_123
# JWT 시크릿 키
JWT_SECRET: and0X3NlY3JldF9rZXlfMTIz # jwt_secret_key_123
|
k8s/order-service/deployment.yaml: 애플리케이션의 배포 전략, 리소스 제한, 헬스 체크, 보안 설정을 정의한다. 무중단 배포 (Rolling Update) 와 Pod 분산 배치를 지원한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
| # Order Service 애플리케이션 배포 정의
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
namespace: ecommerce
labels:
app: order-service
version: v1
component: api
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # 무중단 배포 보장
selector:
matchLabels:
app: order-service
version: v1
template:
metadata:
labels:
app: order-service
version: v1
component: api
annotations:
# Istio 사이드카 주입
sidecar.istio.io/inject: "true"
# Prometheus 스크래핑 설정
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
# 설정 체크섬 (ConfigMap 변경 감지)
checksum/config: "{{ .Values.configChecksum }}"
spec:
serviceAccountName: order-service-sa
# 초기화 컨테이너 (데이터베이스 마이그레이션)
initContainers:
- name: db-migration
image: myregistry/order-service:v1.2.3
command: ['python', '-c', 'from database import db_manager; db_manager.create_tables()']
env:
- name: DATABASE_URL
valueFrom:
configMapKeyRef:
name: order-service-config
key: DATABASE_URL
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: order-service-secret
key: DB_PASSWORD
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
containers:
- name: order-service
image: myregistry/order-service:v1.2.3
imagePullPolicy: Always
ports:
- containerPort: 8000
name: http
protocol: TCP
# 환경 변수 설정
env:
# ConfigMap에서 주입
- name: DATABASE_URL
valueFrom:
configMapKeyRef:
name: order-service-config
key: DATABASE_URL
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: order-service-config
key: REDIS_URL
- name: KAFKA_SERVERS
valueFrom:
configMapKeyRef:
name: order-service-config
key: KAFKA_SERVERS
- name: ENVIRONMENT
valueFrom:
configMapKeyRef:
name: order-service-config
key: ENVIRONMENT
- name: LOG_LEVEL
valueFrom:
configMapKeyRef:
name: order-service-config
key: LOG_LEVEL
# Secret에서 주입
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: order-service-secret
key: DB_PASSWORD
- name: KAFKA_API_KEY
valueFrom:
secretKeyRef:
name: order-service-secret
key: KAFKA_API_KEY
# 리소스 제한
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
# 헬스 체크 설정
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
successThreshold: 1
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
successThreshold: 1
# 시작 프로브 (느린 시작 대응)
startupProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 10
# 보안 컨텍스트
securityContext:
allowPrivilegeEscalation: false
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
# 볼륨 마운트
volumeMounts:
- name: tmp
mountPath: /tmp
- name: app-logs
mountPath: /app/logs
- name: cache
mountPath: /app/cache
# 볼륨 정의
volumes:
- name: tmp
emptyDir:
sizeLimit: 100Mi
- name: app-logs
emptyDir:
sizeLimit: 500Mi
- name: cache
emptyDir:
sizeLimit: 200Mi
# Pod 스케줄링 및 보안 설정
securityContext:
fsGroup: 1001
runAsNonRoot: true
# Node 선택 및 톨러레이션
nodeSelector:
kubernetes.io/arch: amd64
node-type: worker
# Pod 분산 배치 (고가용성)
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- order-service
topologyKey: kubernetes.io/hostname
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: node-type
operator: In
values:
- worker
# 톨러레이션 (특정 노드에서 실행 허용)
tolerations:
- key: node.kubernetes.io/not-ready
operator: Exists
effect: NoExecute
tolerationSeconds: 300
- key: node.kubernetes.io/unreachable
operator: Exists
effect: NoExecute
tolerationSeconds: 300
|
k8s/order-service/service.yaml: 네트워크 서비스 디스커버리와 로드 밸런싱을 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # Order Service 네트워크 서비스 정의
apiVersion: v1
kind: Service
metadata:
name: order-service
namespace: ecommerce
labels:
app: order-service
service: order-service
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: "nlb" # AWS NLB 사용 시
spec:
type: ClusterIP # 클러스터 내부 전용
sessionAffinity: None # 세션 고정 없음 (스테이트리스)
ports:
- name: http
port: 8080 # 서비스 포트
targetPort: 8000 # 컨테이너 포트
protocol: TCP
selector:
app: order-service
---
# 헤드리스 서비스 (서비스 디스커버리용)
apiVersion: v1
kind: Service
metadata:
name: order-service-headless
namespace: ecommerce
labels:
app: order-service
service: order-service-headless
spec:
type: ClusterIP
clusterIP: None # 헤드리스 서비스
ports:
- name: http
port: 8080
targetPort: 8000
protocol: TCP
selector:
app: order-service
|
k8s/order-service/hpa.yaml: CPU, 메모리, 커스텀 메트릭 기반의 자동 스케일링을 설정한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| # 수평적 Pod 자동 스케일러 (HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-service-hpa
namespace: ecommerce
labels:
app: order-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
minReplicas: 3
maxReplicas: 20
# 스케일링 메트릭
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# 커스텀 메트릭 (Prometheus 기반)
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100" # 초당 100 요청
# 스케일링 동작 제어
behavior:
scaleUp:
stabilizationWindowSeconds: 60 # 60초 안정화 기간
policies:
- type: Percent
value: 50 # 50% 증가
periodSeconds: 60
- type: Pods
value: 2 # 최대 2개 Pod 추가
periodSeconds: 60
selectPolicy: Max # 더 큰 값 선택
scaleDown:
stabilizationWindowSeconds: 300 # 5분 안정화 기간
policies:
- type: Percent
value: 25 # 25% 감소
periodSeconds: 60
- type: Pods
value: 1 # 최대 1개 Pod 제거
periodSeconds: 60
selectPolicy: Min # 더 작은 값 선택
|
k8s/order-service/pdb.yaml: 클러스터 유지보수 중에도 최소한의 서비스 가용성을 보장한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # Pod 중단 예산 (Pod Disruption Budget)
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: order-service-pdb
namespace: ecommerce
labels:
app: order-service
spec:
minAvailable: 2 # 최소 2개 Pod 유지
# 또는 maxUnavailable: 1 # 최대 1개 Pod 중단 허용
selector:
matchLabels:
app: order-service
|
모니터링#
k8s/order-service/servicemonitor.yaml: Prometheus 메트릭 수집 설정을 통해 애플리케이션 성능과 상태를 모니터링한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| # Prometheus ServiceMonitor (메트릭 수집)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: order-service-monitor
namespace: ecommerce
labels:
app: order-service
prometheus: monitoring
spec:
selector:
matchLabels:
app: order-service
endpoints:
- port: http
path: /metrics
interval: 30s
scrapeTimeout: 10s
honorLabels: true
# 메트릭 재라벨링
metricRelabelings:
- sourceLabels: [__name__]
regex: 'http_request_duration_bucket'
targetLabel: __name__
replacement: 'order_service_http_request_duration_bucket'
# 샘플 제한
sampleLimit: 10000
targetLimit: 100
|
데이터베이스 (PostgreSQL)#
k8s/postgres/configmap.yaml: PostgreSQL 최적화 설정과 보안 규칙을 정의한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| # PostgreSQL 설정 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-config
namespace: ecommerce
labels:
app: postgres
data:
POSTGRES_DB: "orderdb"
POSTGRES_USER: "orderuser"
PGDATA: "/var/lib/postgresql/data/pgdata"
# PostgreSQL 설정 파일
postgresql.conf: |
# 연결 설정
listen_addresses = '*'
port = 5432
max_connections = 200
# 메모리 설정
shared_buffers = '256MB'
effective_cache_size = '1GB'
work_mem = '4MB'
maintenance_work_mem = '64MB'
# WAL 설정
wal_level = replica
max_wal_size = '1GB'
min_wal_size = '80MB'
checkpoint_completion_target = 0.9
# 로깅 설정
log_destination = 'stderr'
logging_collector = on
log_directory = '/var/log/postgresql'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_min_duration_statement = 1000
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
# 성능 튜닝
random_page_cost = 1.1
effective_io_concurrency = 200
pg_hba.conf: |
# TYPE DATABASE USER ADDRESS METHOD
local all all trust
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
host all all 0.0.0.0/0 md5
|
k8s/postgres/secret.yaml: 데이터베이스 비밀번호, API 키 등 민감한 정보를 Base64 로 인코딩하여 안전하게 저장한다.
1
2
3
4
5
6
7
8
9
10
11
12
| # PostgreSQL 비밀번호 Secret
apiVersion: v1
kind: Secret
metadata:
name: postgres-secret
namespace: ecommerce
labels:
app: postgres
type: Opaque
data:
POSTGRES_PASSWORD: cGFzc3dvcmQxMjM= # password123 (base64)
POSTGRES_ROOT_PASSWORD: cm9vdHBhc3N3b3JkMTIz # rootpassword123 (base64)
|
k8s/postgres/statefulset.yaml: 상태 보존이 필요한 데이터베이스를 위한 StatefulSet 을 사용하여 영구 스토리지와 안정적인 네트워크 식별자를 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
| # PostgreSQL StatefulSet (상태 보존 워크로드)
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: ecommerce
labels:
app: postgres
spec:
serviceName: postgres-headless
replicas: 1 # 단일 인스턴스 (고가용성을 위해서는 PostgreSQL 클러스터 구성 필요)
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15-alpine
ports:
- containerPort: 5432
name: postgres
env:
- name: POSTGRES_DB
valueFrom:
configMapKeyRef:
name: postgres-config
key: POSTGRES_DB
- name: POSTGRES_USER
valueFrom:
configMapKeyRef:
name: postgres-config
key: POSTGRES_USER
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secret
key: POSTGRES_PASSWORD
- name: PGDATA
valueFrom:
configMapKeyRef:
name: postgres-config
key: PGDATA
# 리소스 제한
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
# 헬스 체크
livenessProbe:
exec:
command:
- /bin/sh
- -c
- pg_isready -U $POSTGRES_USER -d $POSTGRES_DB
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
exec:
command:
- /bin/sh
- -c
- pg_isready -U $POSTGRES_USER -d $POSTGRES_DB
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
# 볼륨 마운트
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
- name: postgres-config-volume
mountPath: /etc/postgresql/postgresql.conf
subPath: postgresql.conf
- name: postgres-config-volume
mountPath: /etc/postgresql/pg_hba.conf
subPath: pg_hba.conf
- name: postgres-logs
mountPath: /var/log/postgresql
volumes:
- name: postgres-config-volume
configMap:
name: postgres-config
- name: postgres-logs
emptyDir: {}
# 보안 설정
securityContext:
fsGroup: 999 # postgres 그룹
# 영구 볼륨 클레임 템플릿
volumeClaimTemplates:
- metadata:
name: postgres-data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: "fast-ssd" # 실제 스토리지 클래스로 변경
resources:
requests:
storage: 20Gi
|
k8s/postgres/service.yaml: 네트워크 서비스 디스커버리와 로드 밸런싱을 제공한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| # PostgreSQL 서비스
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: ecommerce
labels:
app: postgres
spec:
type: ClusterIP
ports:
- port: 5432
targetPort: 5432
protocol: TCP
name: postgres
selector:
app: postgres
---
# PostgreSQL 헤드리스 서비스
apiVersion: v1
kind: Service
metadata:
name: postgres-headless
namespace: ecommerce
labels:
app: postgres
spec:
type: ClusterIP
clusterIP: None
ports:
- port: 5432
targetPort: 5432
protocol: TCP
name: postgres
selector:
app: postgres
|
캐시 (Redis)#
- deployment.yaml:
- configmap.yaml:
k8s/redis/configmap.yaml: Redis 성능 튜닝과 지속성 설정을 포함한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # Redis 설정 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: redis-config
namespace: ecommerce
labels:
app: redis
data:
redis.conf: |
# 네트워크 설정
bind 0.0.0.0
port 6379
protected-mode no
# 메모리 설정
maxmemory 512mb
maxmemory-policy allkeys-lru
# 지속성 설정 (AOF)
appendonly yes
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# RDB 설정
save 900 1
save 300 10
save 60 10000
# 로깅
loglevel notice
logfile ""
# 클라이언트 설정
timeout 300
tcp-keepalive 60
tcp-backlog 511
# 성능 튜닝
hz 10
# 보안 (실제 운영에서는 비밀번호 설정 필요)
# requirepass yourpassword
|
k8s/redis/deployment.yaml: 인메모리 캐시 서버의 배포와 설정을 관리한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| # Redis 배포 정의
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: ecommerce
labels:
app: redis
spec:
replicas: 1 # Redis는 단일 인스턴스 (클러스터 구성 시 변경)
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
name: redis
command:
- redis-server
- /etc/redis/redis.conf
# 리소스 제한
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
# 헬스 체크
livenessProbe:
exec:
command:
- redis-cli
- ping
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
exec:
command:
- redis-cli
- ping
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
# 볼륨 마운트
volumeMounts:
- name: redis-config
mountPath: /etc/redis/redis.conf
subPath: redis.conf
- name: redis-data
mountPath: /data
volumes:
- name: redis-config
configMap:
name: redis-config
- name: redis-data
emptyDir: {} # 실제 운영에서는 PVC 사용 권장
|
k8s/redis/service.yaml: 네트워크 서비스 디스커버리와 로드 밸런싱을 제공한다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # Redis 서비스
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: ecommerce
labels:
app: redis
spec:
type: ClusterIP
ports:
- port: 6379
targetPort: 6379
protocol: TCP
name: redis
selector:
app: redis
|
메시징 (Kafka)#
k8s/kafka/configmap.yaml: 토픽 설정, 파티셔닝, 복제 등의 Kafka 클러스터 설정을 관리한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| # Kafka 설정 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config
namespace: ecommerce
labels:
app: kafka
data:
server.properties: |
# 브로커 설정
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-service:9092
# 로그 설정
log.dirs=/opt/kafka/logs
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 토픽 설정
num.partitions=3
default.replication.factor=1
min.insync.replicas=1
# 로그 보존 설정
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# Zookeeper 연결
zookeeper.connect=zookeeper-service:2181
zookeeper.connection.timeout.ms=18000
# 기타 설정
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
---
# Zookeeper 설정
apiVersion: v1
kind: ConfigMap
metadata:
name: zookeeper-config
namespace: ecommerce
labels:
app: zookeeper
data:
zoo.cfg: |
tickTime=2000
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=0.0.0.0:2888:3888
|
k8s/kafka/deployment.yaml: Kafka 와 Zookeeper 의 배포를 정의하며, 이벤트 스트리밍을 위한 설정을 포함한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
| # Zookeeper 배포 (Kafka 의존성)
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper
namespace: ecommerce
labels:
app: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:7.4.0
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2000"
- name: ZOOKEEPER_SERVER_ID
value: "1"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: zookeeper-data
mountPath: /opt/zookeeper/data
- name: zookeeper-logs
mountPath: /opt/zookeeper/logs
volumes:
- name: zookeeper-data
emptyDir: {}
- name: zookeeper-logs
emptyDir: {}
---
# Kafka 배포
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
namespace: ecommerce
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.4.0
ports:
- containerPort: 9092
name: kafka
env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper-service:2181"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-service:9092"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
exec:
command:
- sh
- -c
- "kafka-broker-api-versions --bootstrap-server localhost:9092"
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
readinessProbe:
exec:
command:
- sh
- -c
- "kafka-broker-api-versions --bootstrap-server localhost:9092"
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
volumeMounts:
- name: kafka-data
mountPath: /opt/kafka/logs
volumes:
- name: kafka-data
emptyDir: {}
|
k8s/kafka/service.yaml: 네트워크 서비스 디스커버리와 로드 밸런싱을 제공한다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| # Zookeeper 서비스
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
namespace: ecommerce
labels:
app: zookeeper
spec:
type: ClusterIP
ports:
- port: 2181
targetPort: 2181
protocol: TCP
name: client
selector:
app: zookeeper
---
# Kafka 서비스
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: ecommerce
labels:
app: kafka
spec:
type: ClusterIP
ports:
- port: 9092
targetPort: 9092
protocol: TCP
name: kafka
selector:
app: kafka
|
보안 및 네트워킹#
k8s/rbac.yaml: 서비스 계정과 권한을 정의하여 최소 권한 원칙을 적용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| # 서비스 계정 및 RBAC 설정
apiVersion: v1
kind: ServiceAccount
metadata:
name: order-service-sa
namespace: ecommerce
labels:
app: order-service
---
# 클러스터 역할 정의
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: order-service-role
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list"]
---
# 클러스터 역할 바인딩
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: order-service-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: order-service-role
subjects:
- kind: ServiceAccount
name: order-service-sa
namespace: ecommerce
|
k8s/network-policy.yaml: 네트워크 보안 정책을 통해 Pod 간 통신을 제어하고 제로 트러스트 네트워킹을 구현한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| # 네트워크 보안 정책
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: order-service-network-policy
namespace: ecommerce
spec:
podSelector:
matchLabels:
app: order-service
policyTypes:
- Ingress
- Egress
# 인그레스 규칙 (들어오는 트래픽)
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ecommerce
- namespaceSelector:
matchLabels:
name: istio-system
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- protocol: TCP
port: 8000
# 이그레스 규칙 (나가는 트래픽)
egress:
# 같은 네임스페이스 내 서비스 통신
- to:
- namespaceSelector:
matchLabels:
name: ecommerce
ports:
- protocol: TCP
port: 5432 # PostgreSQL
- protocol: TCP
port: 6379 # Redis
- protocol: TCP
port: 9092 # Kafka
- protocol: TCP
port: 2181 # Zookeeper
# 외부 서비스 통신 (다른 마이크로서비스)
- to:
- namespaceSelector:
matchLabels:
name: user-service
- namespaceSelector:
matchLabels:
name: product-service
- namespaceSelector:
matchLabels:
name: payment-service
ports:
- protocol: TCP
port: 8080
# DNS 조회 허용
- to: []
ports:
- protocol: UDP
port: 53
- protocol: TCP
port: 53
# HTTPS 외부 통신 (모니터링, 로깅 등)
- to: []
ports:
- protocol: TCP
port: 443
|
실무에서 효과적으로 적용하기 위한 고려사항 및 주의할 점#
카테고리 | 고려사항 / 주제 | 설명 | 권장사항 / 전략 |
---|
아키텍처 설계 | 서비스 경계 정의 | 비즈니스 도메인 기반의 독립적 서비스 분리 필요 | 도메인 주도 설계 (DDD), Bounded Context 적용 |
| 통신 방식 선택 | 서비스 간 통신 방식 (REST/gRPC 등) 에 따른 성능·복잡도 영향 | 고성능 요구 시 gRPC, API Gateway 활용 |
| 마이그레이션 전략 | 레거시 시스템과의 공존 및 단계적 전환 필요 | Strangler Fig 패턴, 하이브리드·멀티클라우드 전략 적용 |
CI/CD & 자동화 | 자동화된 배포 파이프라인 구축 | 신속하고 신뢰성 있는 배포 체계 필수 | GitOps, Jenkins, GitLab, Canary/Rolling 배포 적용 |
| 테스트 자동화 | 배포 전 안정성 확보 필요 | 통합 테스트, Mock/Stub 테스트, E2E 테스트 자동화 |
| IaC 도입 | 인프라 구성의 재현성과 자동화 필수 | Terraform, Pulumi 등 IaC 도구 활용 |
관찰가능성 | 통합 모니터링 구성 | 로그, 메트릭, 트레이싱 통합이 필수 | Prometheus, Grafana, ELK, Jaeger, OpenTelemetry 도입 |
| 장애 추적 및 분석 | 분산 환경에서 원인 탐색이 어려움 | 분산 추적 시스템 구축, APM 도구 활용 |
보안 | 서비스 간 통신 보안 | 네트워크 상에서의 안전한 데이터 전송 필요 | mTLS, RBAC, OPA, 네트워크 정책 구성 |
| 인증/인가 강화 | 리소스 접근 통제 및 최소 권한 정책 필요 | Zero Trust 모델, OAuth2, 정기 보안 감사 |
| DevSecOps 문화 정착 | 개발 - 운영 - 보안 연계의 자동화 및 조기 통합 | 보안 자동화 툴 통합, 이미지 스캔, SBoM 관리 |
조직 및 문화 | DevOps 문화 구축 | 사일로 조직 탈피 및 협업 문화 필요 | 크로스 펀셔널 팀 구성, 자율성 부여, 실패 수용 문화 조성 |
| 기술 학습 및 도입 관리 | 새로운 기술 도입 시 학습 곡선 존재 | 단계적 학습 프로그램, PoC 기반 적용, 내부 교육 체계 운영 |
운영 및 복원력 | 장애 대응 전략 | 서비스 실패 발생 시 자동 복구 필요 | Kubernetes Self-healing, Auto-scaling, Chaos Engineering 적용 |
| 시스템 복원력 검증 | 장애 상황에서의 탄력성 확보 | 카오스 테스트, 롤백 전략 수립, 인시던트 대응 프로세스 표준화 |
| 정기 아키텍처 리뷰 | 기술 부채 누적 방지 및 개선 | 리뷰 프로세스 도입, 기술 우선순위 정의, 기술 부채 관리 |
데이터 관리 | 일관성 모델 설계 | 분산 환경에서 ACID 유지의 어려움 | 최종 일관성 기반 설계, Event Sourcing, CQRS, SAGA 패턴 도입 |
| 데이터 보안 및 접근 제어 | 민감 정보 처리 시 암호화와 접근 통제가 필요 | 데이터 암호화, 비밀 관리 솔루션 (e.g., Vault), 감사 로깅 |
네트워크 및 메시 | 서비스 메시 구성 | 트래픽 제어 및 보안 정책 적용 필요 | Istio/Linkerd 기반 트래픽 제어, 정책 관리 |
| 네트워크 지연 최소화 | 서비스 간 통신으로 인한 레이턴시 증가 | 비동기 통신, 로컬 캐싱, 데이터 지역성 고려 |
비용 최적화 | 자원 사용 관리 | 과도한 리소스 사용은 비용 상승 유발 | HPA/VPA 기반 자동 스케일링, FinOps, 예약/스팟 인스턴스 전략 |
| 운영비용 모니터링 | 실시간 비용 추적 및 예측 필요 | 클라우드 비용 분석 도구 활용, 태그 기반 비용 추적, 알람 설정 |
적용 시 주의할 점 요약#
항목 | 주의 사항 | 회피 전략 |
---|
과도한 마이크로서비스 분해 | 복잡도 및 관리 포인트 증가 | 서비스 분리 기준 명확히 수립, 최소 실행 단위로 설계 |
중앙 집중식 조직 구조 | 자율성 결여, 병목 발생 가능성 | 도메인 기반 크로스 펀셔널 팀 구성 |
무계획 기술 도입 | 기술 부채 유발, 운영 복잡성 증가 | PoC 기반 검증 → 중요도 낮은 서비스부터 도입 |
비용 관리 부재 | 클라우드 사용량 증가로 인한 과금 급증 | 모니터링, 자동 종료 정책, 비용 리포팅 도입 |
보안 정책 일관성 부족 | 환경별 정책 상이, 보안 공백 발생 | 정책 템플릿화, 보안 자동화 및 DevSecOps 연동 |
클라우드 네이티브 환경에서 발생 가능한 문제#
문제 유형 | 원인 요약 | 탐지 및 진단 기법 | 예방 방법 | 해결 방법 |
---|
분산 시스템 장애 | - 네트워크 분할 - 서비스 간 의존성 연쇄 실패 - 부분 장애 | - Circuit Breaker 이벤트 로그 - 헬스체크 - 타임아웃 모니터링 | - 회로 차단기 패턴 (Circuit Breaker) - 재시도 + 백오프 - 헬스체크 엔드포인트 구축 | - Bulkhead 패턴 - 비동기 메시징 도입 - 자동 장애 조치 (Auto-failover) - 카오스 테스트 |
데이터 일관성 문제 | - 분산 트랜잭션 - 비동기 처리 지연 - 마이크로서비스별 데이터베이스 | - 무결성 검증 스크립트 - 이벤트 소싱 감사 로그 - 실시간 상태 감시 | - 이벤트 기반 처리 구조 SAGA 패턴 - 최종 일관성 기반 모델 도입 | - 보상 트랜잭션 실행 - 이벤트 재처리 - 수동 정합성 검증 및 수동 수정 |
컨테이너 보안 취약점 | - 취약 이미지 사용 - 과도 권한 설정 - 설정 오류 | - 이미지 스캔 도구 PodSecurityPolicy 감시 - 네트워크 정책 로그 | - 취약점 정기 스캔 - 최소 권한 원칙 적용 - 네트워크 격리 및 시크릿 암호화 | - 취약 컨테이너 격리 - 패치 이미지로 롤아웃 - 보안 사고 대응 프로세스 가동 |
리소스 고갈 (과부하) | - 리소스 제한 미설정 - 메모리 누수 - 트래픽 급증 | - 리소스 모니터링 (CPU/Mem) - 임계치 알람 설정 - 프로파일링 | - HPA 설정 - 리소스 요청/제한 설정 - 로드 테스트 주기 실행 | - 긴급 확장 (스케일업/스케일아웃) - 트래픽 제한 - 백그라운드 서비스 중단 |
참고 키워드 및 관련 기술#
개념 | 설명 | 주요 도구/패턴 |
---|
회로 차단기 패턴 | 장애 발생 시 일시적으로 요청 차단 | opossum , resilience4j , Hystrix |
Bulkhead 패턴 | 장애가 다른 영역으로 확산되지 않도록 격리 | Thread Pool, Worker Queue 등 격리 구간 운영 |
SAGA 패턴 | 분산 트랜잭션 대체 전략, 보상 트랜잭션 포함 | Event Choreography, Command Orchestration |
최종 일관성 모델 | eventual consistency 를 기본으로 동작하는 데이터 모델 | Event Sourcing, NoSQL |
PodSecurityPolicy | Kubernetes 의 기본적인 보안 정책 도구 | psp , OPA , Kyverno , PodSecurity |
HPA/VPA | 수평/수직 자동 스케일링 정책 | Kubernetes Autoscaler |
최적화하기 위한 고려사항 및 주의할 점#
카테고리 | 최적화 영역 | 고려사항 / 주의점 | 권장사항 또는 전략 |
---|
성능 최적화 | 캐싱 전략 | 캐시 무효화 복잡성, 불필요한 외부 호출 | 다층 캐시 구조, TTL 정책, Redis 활용 |
| 컨테이너 성능 | 오버헤드 증가, 이미지 부피 과다 | Multi-stage build, Alpine 기반 이미지, 성능 프로파일링 |
| JVM/Garbage GC | Java 기반 앱 성능 저하 가능 | GC 튜닝, JVM 설정 최적화 |
| 네트워크 지연 | 서비스 간 통신 지연, 사이드카 오버헤드 | gRPC 사용, 서비스 메시 최적화, 지역성 고려 배치 |
스케일링 최적화 | 오토스케일링 | 지연, 과다 확장, 과소 프로비저닝 | HPA/VPA 조합, 커스텀 메트릭 기반 스케일링, 예측 + 반응 조합 스케일링 |
| 상태 비저장화 | 세션 정보가 로컬에 존재 시 확장성 저하 | Redis 등 외부 세션 스토리지 사용 |
가용성 & 장애 대응 | 다중 리전 운영 | 데이터 동기화 지연, 장애 발생 시 복구 지연 | 비동기 복제, 멀티 AZ 배포, 자동 장애 조치 구성 |
| 장애 감지 및 회복 | 장애 전파, 롤백 실패 가능성 | 헬스체크, 롤링 업데이트, 서킷 브레이커 적용, 자동 복구 플로우 구성 |
보안 최적화 | 컨테이너 및 이미지 보안 | 런타임 공격, 취약한 이미지 사용 | 이미지 스캐닝, SBoM, 런타임 보안 모니터링 |
| 네트워크 보안 | 허술한 정책으로 인한 공격 노출 | Zero Trust 모델, mTLS, NetworkPolicy 적용 |
| 정책 및 인증 | RBAC/OAuth 설정 부재 또는 과도 설정 | 세밀한 RBAC 설계, 자동화된 보안 검사, OPA 기반 정책 구성 |
배포 전략 최적화 | 점진적 배포 | 블루/그린 배포 중 다운타임 발생 가능 | 카나리 배포, 트래픽 점진 전환, 자동 롤백 구성 |
| CI/CD 병목 | 빌드 시간 과도, 테스트 병렬화 실패 | 증분 빌드, 병렬 테스트 실행, 테스트 캐시 활용 |
| GitOps 배포 | 선언형과 실제 상태 불일치 | ArgoCD/Flux 상태 싱크 알림 설정, 자동 재조정 |
리소스 최적화 | 컨테이너 자원 할당 | 과소/과대 할당으로 인한 비용 증가 | 리소스 요청/제한 최적화, VPA 적용, 자원 프로파일링 |
| 비용 최적화 | 예약 인스턴스 누락, 스팟 인스턴스 미활용 | 스팟 인스턴스, 자동 종료 정책, 비용 추적 시스템 도입 |
데이터 최적화 | 파티셔닝 & 샤딩 | 핫스팟 발생, 쏠림 문제 | 데이터 샤딩, 해시 기반 로드 밸런싱, 분산 쿼리 최적화 |
| 이벤트 기반 저장 | 이벤트 일관성 유지, 스냅샷 관리 | Event Sourcing + CQRS 적용, 이벤트 리플레이 전략 수립 |
관찰가능성 최적화 | 메트릭 수집 | 메트릭 오버헤드, 무분별한 수집 | 샘플링, 메트릭 수명 관리, 핵심 메트릭만 집중 수집 |
| 로깅 | 과도한 로그로 인한 저장 비용 증가 | 로그 레벨 조정, Fluentd 필터링, 중앙화 로그 저장소 사용 |
| 분산 추적 | 스팬 누락, 성능 오버헤드 | Jaeger/Zipkin 샘플링 조정, context 전파 최적화 |
테스트 & 문서화 | 테스트 자동화 | E2E 테스트 병목, Mock/Stub 부족 | 테스트 병렬화, CI 통합, 테스트 시나리오 다양화 |
| 운영 문서화 | 문서 부재 또는 업데이트 누락 | Swagger/OpenAPI 활용, 운영 위키화, 자동 문서 생성 파이프라인 |
아키텍처 개선 | 기술 부채 누적 | 구식 라이브러리, 설계 오류 축적 | 정기 아키텍처 리뷰, 리팩토링 주기화, 신규 기술/패턴 도입 프로세스 운영 |
- HPA/VPA: Horizontal/Vertical Pod Autoscaler
- OPA: Open Policy Agent
- gRPC: 고성능 통신을 위한 Google RPC
- Service Mesh: Istio/Linkerd 등 통신 관리 계층
- Zero Trust: 인증 중심의 경계 없는 보안 모델
주제와 관련하여 주목할 내용들#
카테고리 | 항목 | 설명 |
---|
ア키텍처 원칙 | 12-Factor App | 클라우드 네이티브 환경에 최적화된 앱 개발 원칙 |
| 마이크로서비스 | 독립적으로 배포 및 확장 가능한 서비스 단위 구성 |
| 서버리스 (Serverless) | 인프라 추상화를 통해 코드 실행에 집중 |
| Cloud-Native | 클라우드 환경에 최적화된 아키텍처 스타일 |
| API Gateway | 서비스 통합 진입점으로 인증, 라우팅, 트래픽 제어 수행 |
컨테이너/인프라 | 컨테이너 (Container) | 실행 환경의 격리 및 이식성 제공 |
| Kubernetes | 컨테이너 오케스트레이션 플랫폼, 자동화된 배포/확장/복구 기능 제공 |
| Sidecar Pattern | 보조 기능을 제공하는 별도 컨테이너를 함께 배포하는 패턴 |
| Rook | Kubernetes 네이티브 분산 스토리지 운영 (Ceph 등 지원) |
| Vitess | MySQL 기반의 수평 확장 샤딩 솔루션 |
오케스트레이션 | KEDA | 이벤트 기반 Kubernetes 오토스케일링 도구 |
| Crossplane | 클라우드 인프라를 Kubernetes API 기반으로 관리 |
DevOps/자동화 | CI/CD | 지속적 통합/배포를 통해 자동화된 개발 파이프라인 구축 |
| ArgoCD + ApplicationSet | GitOps 기반 다중 클러스터 자동 배포 |
| Tekton | Kubernetes 네이티브 CI/CD 파이프라인 구성 |
| IaC (Infrastructure as Code) | Terraform, Pulumi 등으로 코드 기반 인프라 자동화 |
| GitOps | Git 저장소 기반 선언형 배포 방식 |
서비스 메시 | Istio | 트래픽 제어, 보안, 관측성 등을 위한 강력한 메시 인프라 |
| Ambient Mesh | 사이드카 없는 경량 메시 아키텍처 구현 (Istio 기반) |
| Linkerd | 경량화된 서비스 메시, Rust 기반 프록시 (Linkerd2-proxy) 포함 |
보안 | Zero Trust | 내부/외부 구분 없는 무조건 검증 기반의 보안 모델 |
| mTLS (Mutual TLS) | 클라이언트와 서버 간의 상호 인증 제공 |
| OPA Gatekeeper | Kubernetes 정책을 코드로 정의하고 통제 (Policy as Code) |
| Falco | 컨테이너 런타임 보안 모니터링 및 위협 탐지 |
관찰가능성 | Observability Stack | Prometheus, Grafana, ELK 등 통합 모니터링 체계 |
| OpenTelemetry | 분산 트레이싱, 메트릭, 로그 수집을 위한 표준 오픈소스 프레임워크 |
| Auto-instrumentation | 코드 변경 없이 자동 계측 기능 지원 |
| Jaeger + Adaptive Sampling | 트래픽 양에 따른 동적 샘플링을 통해 효율적 분산 추적 가능 |
네트워킹 | Cilium | eBPF 기반 고성능 CNI (네트워크 플러그인), 관측성 및 보안 기능 강화 |
| Envoy Gateway | Envoy 기반 Kubernetes 네이티브 API Gateway |
데이터 | Apache Pinot | 실시간 분석용 OLAP 엔진 (초고속 쿼리 처리) |
| CQRS | 명령과 조회 책임을 분리하여 성능과 확장성 개선 |
| Event Sourcing | 상태 변경 이력을 이벤트 시퀀스로 저장 |
메시징 | NATS | 경량, 고성능의 클라우드 네이티브 메시징 시스템 |
주제 관련 추가 학습 필요 항목#
대분류 | 주제 | 세부 항목 | 설명 |
---|
개발 원칙 | 애플리케이션 설계 | 12-Factor App | 클라우드 환경에 최적화된 앱 설계 원칙 (환경 구성, 릴리즈, 로깅 등) |
| 마이크로서비스 설계 | 단일 책임, 느슨한 결합, 높은 응집도 | 독립적 배포와 유지보수를 위한 서비스 분해 원칙 |
| API 설계 | REST, GraphQL, API 버저닝 등 | 확장성과 보안 고려한 API 중심 설계 전략 |
인프라 관리 | IaC | Terraform, Pulumi, Ansible | 인프라 코드화를 통한 자동화, 버전관리 |
| 클라우드 플랫폼 | AWS, Azure, GCP | 주요 클라우드 환경에서 제공하는 관리형 서비스 활용 |
| 오케스트레이션 | Kubernetes, containerd | 컨테이너 스케줄링, 서비스 배포/확장/복구 등 관리 |
운영 모델 | GitOps | ArgoCD, Flux | Git 기반 선언형 배포 모델로 운영 일관성 확보 |
| CI/CD 파이프라인 | Jenkins, GitLab CI 등 | 코드 빌드, 테스트, 배포 자동화 |
보안 | 제로 트러스트 | mTLS, RBAC, 정책 엔진 등 | 인증·인가·통신 보안 강화 (서비스 간 보안 경계 강화) |
| 컨테이너 보안 | 이미지 스캔, 런타임 보안 도구 | 보안 위협 대응을 위한 컨테이너 기반 공격 탐지 및 방어 |
| 정책 기반 보안 | OPA, Kyverno | 보안 정책 적용 및 검증 자동화 |
데이터 처리 | 이벤트 기반 처리 | Event Sourcing, CQRS | 상태 변경 이벤트 기록 기반 아키텍처 (복구, 트레이싱) |
| 분산 데이터 관리 | CAP 이론, BASE/ACID 트랜잭션 | 데이터 일관성 모델, 분산 트랜잭션 처리 전략 |
네트워킹 | 서비스 디스커버리 | DNS, Consul, Service Mesh | 동적 서비스 탐색 및 로드 밸런싱 구현 |
| 서비스 메시 | Istio, Linkerd | 트래픽 라우팅, 관찰가능성, mTLS 기반 보안 통신 |
관찰가능성 | 메트릭 수집 | Prometheus, Grafana | 메트릭 기반 모니터링 및 시각화 |
| 로깅 | ELK Stack, Fluentd | 로그 수집, 중앙화, 파싱 및 검색 |
| 분산 추적 | Jaeger, Zipkin | 트랜잭션 추적 및 성능 병목 지점 분석 |
서비스 모델 | 서버리스 | AWS Lambda, Google Cloud Func | 이벤트 기반 실행, 인프라 관리 최소화 |
| 하이브리드/멀티클라우드 | 온프레미스 + 퍼블릭 클라우드 | 여러 클라우드 환경 통합 운영 전략 |
보안운영 | DevSecOps | 자동화된 보안, 취약점 분석 | CI/CD 파이프라인에서 보안 검사 자동화, 통합 운영 |
용어 정리#
카테고리 | 용어 | 설명 |
---|
아키텍처 스타일 | 클라우드 네이티브 (Cloud-Native) | 클라우드에 최적화된 애플리케이션 아키텍처 설계 및 운영 방식 |
| 마이크로서비스 (Microservices) | 독립적으로 배포 가능한 작고 기능 중심의 서비스 단위로 구성된 아키텍처 |
| 서버리스 (Serverless) | 서버 관리 없이 이벤트 기반으로 코드 실행하는 컴퓨팅 모델 |
| API 게이트웨이 (API Gateway) | 클라이언트 요청을 통합 처리하는 단일 진입점 서비스 |
| 서비스 메시 (Service Mesh) | 마이크로서비스 간의 통신, 보안, 트래픽을 제어하는 인프라 계층 |
| 사이드카 패턴 (Sidecar Pattern) | 애플리케이션과 함께 배포되는 독립 실행 보조 컨테이너 |
| 스트랭글러 피그 (Strangler Fig) | 레거시 시스템을 점진적으로 대체하는 마이그레이션 전략 |
운영 방법론 | DevOps | 개발과 운영의 통합 및 자동화를 통한 협업 중심 개발 문화 |
| GitOps | Git 저장소를 운영 소스로 삼아 인프라와 앱을 선언형으로 자동 배포하는 방식 |
| 불변 인프라 (Immutable Infrastructure) | 인프라 구성 변경 대신 전체 교체로 상태 일관성을 유지하는 방식 |
| CI/CD | 지속적 통합 및 지속적 배포를 자동화하는 개발 파이프라인 |
| IaC (Infrastructure as Code) | 인프라 구성을 코드로 정의하고 관리하는 방법 |
컨테이너 기술 | 컨테이너 (Container) | 실행 환경을 이식성과 일관성 있게 제공하는 경량화된 패키징 기술 |
| Kubernetes | 컨테이너 오케스트레이션 플랫폼으로 자동 배포, 확장, 복구 지원 |
| 오케스트레이션 (Orchestration) | 컨테이너의 전체 라이프사이클 (배포, 확장, 복구 등) 자동화 관리 |
배포 전략 | 카나리 배포 (Canary Deployment) | 일부 사용자에게만 새 버전을 배포해 위험을 점진적으로 평가하는 전략 |
| 블루 - 그린 배포 (Blue-Green Deployment) | 두 개의 환경을 번갈아 사용하여 배포 중 다운타임을 최소화하는 전략 |
| | |
모니터링/관찰성 | 관찰가능성 (Observability) | 로그, 메트릭, 트레이싱 등으로 시스템 내부 상태를 추론하는 능력 |
| 분산 추적 (Distributed Tracing) | 여러 서비스에 걸친 요청 경로 추적 (예: OpenTelemetry, Jaeger) |
| SLI / SLO / SLA | 서비스 품질 관리 지표: 지표/목표/계약 기반의 가시성 제공 |
보안 | 제로 트러스트 (Zero Trust) | 내부/외부 구분 없이 모든 접속을 검증하는 보안 모델 |
| mTLS (Mutual TLS) | 클라이언트 - 서버 간 상호 인증을 지원하는 TLS 프로토콜 |
| RBAC (Role-Based Access Control) | 역할 기반 권한 부여 시스템으로 리소스 접근 제어 |
데이터 아키텍처 | 이벤트 소싱 (Event Sourcing) | 상태 변경을 이벤트로 저장하고 복원하는 방식 |
| CQRS | Command(쓰기) 와 Query(읽기) 의 책임을 분리하는 아키텍처 패턴 |
| 결과적 일관성 (Eventual Consistency) | 분산 환경에서 일정 시간 후에 데이터 일관성을 보장하는 모델 |
| SAGA 패턴 | 마이크로서비스 간 분산 트랜잭션을 관리하는 패턴 |
복원력/장애대응 | 회로 차단기 (Circuit Breaker) | 장애 전파를 방지하기 위한 트래픽 차단 및 복구 전략 |
| 벌크헤드 (Bulkhead) | 시스템을 격리해 특정 부분의 장애가 전체로 확산되는 것을 방지하는 구조 |
참고 및 출처#
클라우드 네이티브 아키텍처 개요 및 원칙#
마이크로서비스 및 구현 사례#
기술 요소 및 플랫폼#
보안 및 표준#
아키텍처 레퍼런스 및 설문 보고서#