Monitoring

API 모니터링은 API의 가용성, 성능, 기능적 정확성을 지속적으로 관찰하고 평가하는 체계적인 프로세스이다. 이는 현대 소프트웨어 시스템의 안정성과 신뢰성을 보장하는 데 필수적인 요소로, 문제를 조기에 감지하고 해결함으로써 서비스 중단과 사용자 경험 저하를 방지한다.

API 모니터링의 기본 개념

API 모니터링은 단순한 로그 수집을 넘어서, API 시스템의 건강 상태를 종합적으로 관찰하고 평가하는 프로세스이다. 이는 다음과 같은 핵심 측면을 포함한다:

  1. 가용성 모니터링: API가 지속적으로 응답하며 접근 가능한지 확인한다.
  2. 성능 모니터링: 응답 시간, 처리량, 오류율 등의 성능 지표를 추적한다.
  3. 기능적 모니터링: API가 예상대로 정확한 결과를 반환하는지 검증한다.
  4. 인프라 모니터링: API를 지원하는 기본 인프라의 상태를 관찰한다.
  5. 보안 모니터링: 비정상적인 접근 패턴이나 보안 위협을 감지한다.

API 모니터링의 중요성

API 모니터링이 비즈니스와 기술적 측면에서 제공하는 가치는 다음과 같다:

  1. 서비스 신뢰성 보장: 지속적인 모니터링을 통해 API 장애를 빠르게 감지하고 대응함으로써 서비스 중단 시간을 최소화한다.
  2. 사용자 경험 향상: 성능 저하 문제를 조기에 식별하여 사용자 만족도를 유지하고 개선한다.
  3. 프로액티브한 문제 해결: 문제가 심각해지기 전에 조기 경고 신호를 통해 선제적으로 대응할 수 있다.
  4. 용량 계획 및 확장성: 트래픽 패턴과 리소스 사용량을 분석하여 적절한 용량 계획을 수립할 수 있다.
  5. SLA(서비스 수준 계약) 준수: 서비스 수준 목표를 지속적으로 측정하고 보고하여 계약 준수를 보장한다.
  6. 비즈니스 인텔리전스: API 사용 패턴을 분석하여 비즈니스 통찰력을 얻을 수 있다.

API 모니터링의 핵심 지표

효과적인 API 모니터링을 위해 추적해야 할 핵심 지표들은 다음과 같다:

가용성 지표

  • 업타임(Uptime): API 서비스의 가용 시간 비율
  • 가동 시간(Uptime Duration): 연속적인 서비스 가동 시간
  • 장애 빈도(Failure Rate): 일정 기간 동안 발생한 장애의 수
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# API 가용성 계산 예시
def calculate_availability(total_requests, failed_requests):
    if total_requests == 0:
        return 100.0  # 요청이 없으면 100% 가용성으로 간주
    
    availability = ((total_requests - failed_requests) / total_requests) * 100
    return round(availability, 2)  # 소수점 둘째 자리까지 반올림

# 9s 가용성 수준 정의
availability_levels = {
    "two_nines": 99.0,    # 가동 중지 시간: 월간 약 7.2시간
    "three_nines": 99.9,  # 가동 중지 시간: 월간 약 43.2분
    "four_nines": 99.99,  # 가동 중지 시간: 월간 약 4.32분
    "five_nines": 99.999  # 가동 중지 시간: 월간 약 26초
}

성능 지표

  • 응답 시간(Response Time): 요청을 처리하는 데 걸리는 시간
  • 지연 시간(Latency): 네트워크 또는 처리 지연 시간
  • 처리량(Throughput): 단위 시간당 처리된 요청 수
  • 요청 속도(Request Rate): 초당 들어오는 요청의 수
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Express.js에서 응답 시간 모니터링 미들웨어 예시
const responseTimeMiddleware = (req, res, next) => {
  const start = process.hrtime();
  
  res.on('finish', () => {
    const end = process.hrtime(start);
    const duration = (end[0] * 1e9 + end[1]) / 1e6; // 밀리초로 변환
    
    // 메트릭 기록 (예: Prometheus, StatsD 등에 전송)
    recordMetric('api.response_time', {
      method: req.method,
      path: req.route ? req.route.path : req.path,
      status: res.statusCode,
      duration: duration
    });
    
    // 응답 시간이 임계값을 초과하면 알림
    if (duration > 500) {  // 500ms = 0.5초
      notifySlowResponse({
        method: req.method,
        path: req.path,
        duration: duration,
        timestamp: new Date()
      });
    }
  });
  
  next();
};

오류 지표

  • 오류율(Error Rate): 전체 요청 중 오류 응답의 비율
  • HTTP 상태 코드 분포: 각 상태 코드(2xx, 4xx, 5xx 등)의 분포
  • 오류 유형 분류: 발생하는 오류의 유형별 분류와 빈도
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# FastAPI에서 오류 모니터링 예시
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import time
import logging
from collections import defaultdict

app = FastAPI()
error_counts = defaultdict(int)
error_logger = logging.getLogger("api_errors")

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    error_type = "validation_error"
    error_counts[error_type] += 1
    
    error_logger.error(
        f"Validation error: endpoint={request.url.path}, client_ip={request.client.host}, "
        f"error_detail={exc.errors()}"
    )
    
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content={"detail": exc.errors()}
    )

@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
    error_type = type(exc).__name__
    error_counts[error_type] += 1
    
    error_logger.error(
        f"Unhandled exception: endpoint={request.url.path}, client_ip={request.client.host}, "
        f"error_type={error_type}, error_message={str(exc)}"
    )
    
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={"detail": "Internal server error"}
    )

# 정기적으로 오류 통계 보고
@app.on_event("startup")
async def start_error_reporting():
    async def report_errors():
        while True:
            await asyncio.sleep(300)  # 5분마다 보고
            
            if error_counts:
                error_logger.info(f"Error summary (last 5 minutes): {dict(error_counts)}")
                # 메트릭 시스템에 데이터 전송
                for error_type, count in error_counts.items():
                    send_metric(f"api.error.{error_type}", count)
                
                # 카운터 초기화
                error_counts.clear()
    
    asyncio.create_task(report_errors())

리소스 사용 지표

  • CPU 사용률: API 서버의 CPU 사용 수준
  • 메모리 사용량: 사용 중인 메모리 양과 패턴
  • 디스크 I/O: 디스크 읽기/쓰기 작업의 양과 지연 시간
  • 네트워크 사용량: 입출력 네트워크 트래픽 양

비즈니스 지표

  • API 호출 볼륨: 시간대별, 엔드포인트별 호출 수
  • 사용자별 API 사용량: 개별 사용자 또는 클라이언트별 사용 패턴
  • 비즈니스 트랜잭션 성공률: 비즈니스 관점에서의 성공적인 트랜잭션 비율
  • 수익 관련 지표: API 사용과 직접 연관된 수익 지표

API 모니터링 구현 방법

합성 모니터링(Synthetic Monitoring)

합성 모니터링은 사전 정의된 시나리오를 주기적으로 실행하여 API의 가용성과 성능을 검증하는 방법이다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Puppeteer를 사용한 합성 모니터링 예시
const puppeteer = require('puppeteer');
const axios = require('axios');

async function runSyntheticTest() {
  const startTime = Date.now();
  let success = false;
  let responseTime = 0;
  let errorMessage = null;
  
  try {
    // API 호출 시뮬레이션
    const response = await axios.get('https://api.example.com/health', {
      timeout: 5000  // 5초 타임아웃
    });
    
    responseTime = Date.now() - startTime;
    success = response.status === 200;
    
    // JWT 인증이 필요한 API 흐름 검증
    const token = await getAuthToken();
    const userResponse = await axios.get('https://api.example.com/users/me', {
      headers: { 'Authorization': `Bearer ${token}` }
    });
    
    // 응답 데이터 유효성 검사
    if (!userResponse.data.id) {
      throw new Error('User data is invalid');
    }
    
  } catch (error) {
    success = false;
    errorMessage = error.message;
    responseTime = Date.now() - startTime;
  }
  
  // 결과 보고
  reportTestResult({
    testName: 'API Health Check',
    success,
    responseTime,
    errorMessage,
    timestamp: new Date()
  });
}

// 스케줄링: 5분마다 테스트 실행
setInterval(runSyntheticTest, 5 * 60 * 1000);

합성 모니터링의 주요 장점:

  • 사용자 흐름을 종단간(end-to-end) 테스트할 수 있음
  • 실제 사용자가 없더라도 모니터링 가능
  • 예정된 중단 시간 동안 비활성화 가능

실제 사용자 모니터링(Real User Monitoring, RUM)

실제 사용자의 API 상호작용을 모니터링하여 실제 환경에서의 성능과 사용자 경험을 측정한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// 클라이언트 측 RUM 구현 예시 (브라우저)
class APIMonitor {
  constructor() {
    this.metrics = {
      calls: 0,
      errors: 0,
      totalResponseTime: 0
    };
    
    this.initialize();
  }
  
  initialize() {
    // 원본 fetch 메서드를 가로채기
    const originalFetch = window.fetch;
    
    window.fetch = async (url, options) => {
      // API 호출만 모니터링 (자체 도메인의 /api 경로)
      if (url.includes('/api')) {
        const start = performance.now();
        let error = false;
        
        try {
          const response = await originalFetch(url, options);
          if (!response.ok) {
            error = true;
          }
          
          const end = performance.now();
          this.recordMetrics(url, end - start, error);
          
          return response;
        } catch (err) {
          const end = performance.now();
          this.recordMetrics(url, end - start, true);
          throw err;
        }
      } else {
        // API가 아닌 경우 원본 fetch 실행
        return originalFetch(url, options);
      }
    };
    
    // 주기적으로 메트릭 전송
    setInterval(() => this.sendMetrics(), 60000);
  }
  
  recordMetrics(url, responseTime, isError) {
    this.metrics.calls++;
    this.metrics.totalResponseTime += responseTime;
    
    if (isError) {
      this.metrics.errors++;
    }
    
    // 개별 엔드포인트별 메트릭 기록도 가능
  }
  
  sendMetrics() {
    if (this.metrics.calls > 0) {
      const avgResponseTime = this.metrics.totalResponseTime / this.metrics.calls;
      const errorRate = (this.metrics.errors / this.metrics.calls) * 100;
      
      // 백엔드 애널리틱스 시스템으로 메트릭 전송
      navigator.sendBeacon('/analytics/api-metrics', JSON.stringify({
        calls: this.metrics.calls,
        avgResponseTime,
        errorRate,
        userAgent: navigator.userAgent,
        timestamp: Date.now()
      }));
      
      // 메트릭 초기화
      this.metrics = {
        calls: 0,
        errors: 0,
        totalResponseTime: 0
      };
    }
  }
}

// 모니터링 시작
const apiMonitor = new APIMonitor();

RUM의 주요 장점:

  • 실제 사용자 경험을 직접 측정
  • 지역, 디바이스, 브라우저별 성능 차이 식별 가능
  • 실제 사용 패턴을 기반으로 한 통찰력 제공

로그 기반 모니터링

API 로그를 수집, 분석하여 성능과 오류 패턴을 파악한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Python/Flask에서의 구조화된 로깅 예시
import logging
import json
import time
import uuid
from flask import Flask, request, g

app = Flask(__name__)

# JSON 형식 로거 설정
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "timestamp": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "message": record.getMessage(),
            "logger": record.name
        }
        
        # 추가 컨텍스트 정보가 있으면 포함
        if hasattr(record, 'request_id'):
            log_record["request_id"] = record.request_id
        
        if hasattr(record, 'path'):
            log_record["path"] = record.path
        
        if hasattr(record, 'method'):
            log_record["method"] = record.method
        
        if hasattr(record, 'duration_ms'):
            log_record["duration_ms"] = record.duration_ms
        
        if hasattr(record, 'status_code'):
            log_record["status_code"] = record.status_code
        
        if hasattr(record, 'user_id'):
            log_record["user_id"] = record.user_id
        
        return json.dumps(log_record)

# 로거 설정
logger = logging.getLogger("api")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# 요청/응답 로깅 미들웨어
@app.before_request
def before_request():
    g.start_time = time.time()
    g.request_id = str(uuid.uuid4())

@app.after_request
def after_request(response):
    duration = time.time() - g.start_time
    
    # 구조화된 로그 레코드 생성
    logger.info(
        f"API Request Completed",
        extra={
            "request_id": g.request_id,
            "method": request.method,
            "path": request.path,
            "status_code": response.status_code,
            "duration_ms": round(duration * 1000, 2),
            "user_id": g.get('user_id', 'anonymous')
        }
    )
    
    return response

# 오류 로깅
@app.errorhandler(Exception)
def handle_exception(e):
    logger.error(
        f"API Error: {str(e)}",
        extra={
            "request_id": g.get('request_id', 'unknown'),
            "method": request.method,
            "path": request.path,
            "error_type": type(e).__name__
        },
        exc_info=True
    )
    
    return {"error": "Internal Server Error"}, 500

로그 기반 모니터링의 주요 장점:

  • 오류의 상세한 컨텍스트 제공
  • 디버깅을 위한 풍부한 정보 제공
  • 과거 문제 분석 및 트렌드 파악 가능

메트릭 기반 모니터링

API의 성능과 동작에 관한 측정 가능한 수치 데이터를 수집하고 분석한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Prometheus 클라이언트를 사용한 메트릭 모니터링 예시
from flask import Flask, request
from prometheus_client import Counter, Histogram, start_http_server
import time

app = Flask(__name__)

# 메트릭 정의
REQUEST_COUNT = Counter(
    'api_requests_total',
    'Total API Request Count',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'api_request_latency_seconds',
    'API Request Latency',
    ['method', 'endpoint']
)

# 메트릭 수집 미들웨어
@app.before_request
def before_request():
    request.start_time = time.time()

@app.after_request
def after_request(response):
    request_latency = time.time() - request.start_time
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.path
    ).observe(request_latency)
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.path,
        status=response.status_code
    ).inc()
    
    return response

# Prometheus 메트릭 서버 시작
@app.before_first_request
def start_metrics_server():
    start_http_server(8000)  # 메트릭을 8000 포트에서 제공

@app.route('/api/user/<user_id>')
def get_user(user_id):
    # API 로직...
    return {"user": {"id": user_id, "name": "Test User"}}

메트릭 기반 모니터링의 주요 장점:

  • 실시간 성능 통찰 제공
  • 알림 및 자동화된 응답 가능
  • 장기적인 성능 추세 분석 용이

분산 추적(Distributed Tracing)

마이크로서비스 아키텍처에서 API 요청이 여러 서비스를 통과하는 경로를 추적한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# OpenTelemetry를 사용한 분산 추적 예시
from flask import Flask
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import requests

# 트레이싱 설정
resource = Resource(attributes={SERVICE_NAME: "user-api"})
tracer_provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="localhost:4317"))
tracer_provider.add_span_processor(processor)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)

app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()

@app.route('/api/user-activity/<user_id>')
def get_user_activity(user_id):
    with tracer.start_as_current_span("get_user_details") as span:
        span.set_attribute("user.id", user_id)
        
        # 사용자 세부 정보 가져오기
        user_response = requests.get(f"http://user-service/users/{user_id}")
        user_data = user_response.json()
        
        with tracer.start_as_current_span("get_activity_data") as activity_span:
            # 사용자 활동 데이터 가져오기
            activity_response = requests.get(f"http://activity-service/activities?user_id={user_id}")
            activity_data = activity_response.json()
            activity_span.set_attribute("activity.count", len(activity_data))
        
        # 응답 데이터 조합
        result = {
            "user": user_data,
            "recent_activities": activity_data[:5]
        }
        
        return result

분산 추적의 주요 장점:

  • 전체 시스템에 걸친 요청 흐름 가시화
  • 서비스 간 지연 시간 파악
  • 시스템 병목 지점 정확히 식별

API 모니터링 도구 및 플랫폼

1. 오픈소스 모니터링 도구

  1. Prometheus + Grafana:
    • 메트릭 수집, 저장, 쿼리 및 시각화를 위한 강력한 조합
    • 강력한 알림 및 쿼리 언어 제공
    • Kubernetes와의 원활한 통합
  2. ELK 스택(Elasticsearch, Logstash, Kibana):
    • 로그 수집, 저장, 검색 및 시각화를 위한 통합 솔루션
    • 비정형 로그 데이터 분석에 적합
    • 강력한 전문 검색 기능
  3. Jaeger/Zipkin:
    • 분산 시스템을 위한 추적 도구
    • 요청 흐름 시각화 및 병목 현상 식별
    • OpenTelemetry와 통합

2. 상용 모니터링 솔루션

  1. Datadog:
    • 종합적인 모니터링 및 분석 플랫폼
    • API, 인프라, 로그, 사용자 경험을 통합적으로 모니터링
    • 강력한 이상 탐지 및 알림 기능
  2. New Relic:
    • 애플리케이션 성능 모니터링(APM) 중심의 플랫폼
    • 상세한 트랜잭션 추적 및 성능 분석
    • 실시간 대시보드 및 알림
  3. Dynatrace:
    • AI 기반 모니터링 및 관찰성 플랫폼
    • 자동화된 문제 발견 및 근본 원인 분석
    • 전체 스택 모니터링

3. 클라우드 제공업체 도구

  1. AWS CloudWatch:
    • AWS 서비스와 통합된 모니터링 솔루션
    • API Gateway, Lambda 등의 서비스 모니터링
    • 사용자 지정 메트릭 및 알림 지원
  2. Google Cloud Monitoring:
    • Google Cloud 서비스 모니터링
    • API 성능 및 오류 추적
    • 통합 대시보드 및 알림
  3. Azure Monitor:
    • Azure 서비스 모니터링
    • Application Insights를 통한 API 추적
    • 종합적인 로그 분석

API 모니터링 구현 모범 사례

다층적 모니터링 전략

효과적인 API 모니터링은 여러 계층에서 접근해야 한다:

  1. 인프라 계층: 서버, 네트워크, 컨테이너 상태 모니터링
  2. 애플리케이션 계층: API 서비스 자체의 성능과 동작 모니터링
  3. 비즈니스 계층: 비즈니스 프로세스와 트랜잭션 성공률 모니터링
  4. 사용자 경험 계층: 클라이언트 측에서의 API 상호작용 모니터링
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Prometheus 알림 규칙 예시 (다층적 접근)
groups:
  - name: api_alerts
    rules:
      # 인프라 계층 알림
      - alert: HighCpuUsage
        expr: avg(rate(node_cpu_seconds_total{mode!="idle"}[5m])) by (instance) > 0.8
        for: 5m
        labels:
          severity: warning
          layer: infrastructure
        annotations:
          summary: "High CPU usage on {{ $labels.instance }}"
          description: "CPU usage is above 80% for 5 minutes"

      # 애플리케이션 계층 알림
      - alert: ApiHighErrorRate
        expr: sum(rate(api_requests_total{status=~"5.."}[5m])) / sum(rate(api_requests_total[5m])) > 0.05
        for: 2m
        labels:
          severity: critical
          layer: application
        annotations:
          summary: "API error rate is high"
          description: "Error rate is above 5% for 2 minutes"

      # 비즈니스 계층 알림
      - alert: LowTransactionSuccessRate
        expr: sum(rate(business_transactions_success_total[5m])) / sum(rate(business_transactions_total[5m])) < 0.95
        for: 5m
        labels:
          severity: critical
          layer: business
        annotations:
          summary: "Business transaction success rate is low"
          description: "Transaction success rate is below 95% for 5 minutes"

효과적인 알림 설계

알림 피로를 방지하고 중요한 문제에 집중할 수 있는 알림 전략을 구현한다:

  1. 임계값 기반 알림: 주요 지표가 사전 정의된 임계값을 초과할 때 알림
  2. 추세 기반 알림: 비정상적인 패턴이나 갑작스러운 변화 감지 시 알림
  3. 우선순위 기반 라우팅: 알림의 심각도에 따라 적절한 대응 채널로 라우팅
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 알림 임계값 설정 및 에스컬레이션 예시
class AlertManager:
    def __init__(self):
        self.thresholds = {
            'response_time': {
                'warning': 500,    # 500ms
                'critical': 1000   # 1초
            },
            'error_rate': {
                'warning': 0.01,   # 1%
                'critical': 0.05   # 5%
            },
            'request_rate': {
                'warning': 1000,   # 초당 1000 요청
                'critical': 1500   # 초당 1500 요청
            }
        }
        
        self.escalation_policies = {
            'warning': ['slack-channel'],
            'critical': ['slack-channel', 'pagerduty', 'sms']
        }
        
        # 알림 발생 이력 (알림 폭풍 방지)
        self.alert_history = {}
        
    def check_and_alert(self, metric_name, value, context=None):
        if metric_name not in self.thresholds:
            return
        
        severity = None
        if value >= self.thresholds[metric_name]['critical']:
            severity = 'critical'
        elif value >= self.thresholds[metric_name]['warning']:
            severity = 'warning'
        
        if severity:
            # 알림 폭풍 방지: 최근에 비슷한 알림이 발생했는지 확인
            alert_key = f"{metric_name}:{severity}"
            current_time = time.time()
            
            # 지난 10분 이내에 알림이 발생했는지 확인
            if (alert_key in self.alert_history and 
                current_time - self.alert_history[alert_key] < 600):
                # 이미 알림이 발생했으므로 무시
                return
            
            # 알림 발생 시간 기록
            self.alert_history[alert_key] = current_time
            
            # 알림 생성 및 전송
            alert = {
                'metric': metric_name,
                'value': value,
                'threshold': self.thresholds[metric_name][severity],
                'severity': severity,
                'timestamp': current_time,
                'context': context or {}
            }
            
            self.send_alert(alert, self.escalation_policies[severity])
    
    def send_alert(self, alert, channels):
        for channel in channels:
            if channel == 'slack-channel':
                self.send_to_slack(alert)
            elif channel == 'pagerduty':
                self.send_to_pagerduty(alert)
            elif channel == 'sms':
                self.send_sms(alert)

전체 API 수명 주기 모니터링

API의 모든 생명 주기 단계를 모니터링하는 것이 중요하다:

  1. 개발 단계: 성능 테스트 및 코드 품질 모니터링
  2. 배포 단계: 배포 성공률 및 서비스 중단 모니터링
  3. 운영 단계: 실시간 성능 및 가용성 모니터링
  4. 폐기 단계: 레거시 API 사용 현황 및 마이그레이션 진행 상황 모니터링
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# API 버전별 사용 현황 모니터링 예시
def track_api_version_usage(request, response):
    """API 버전 사용 현황을 모니터링하는 미들웨어"""
    version = get_api_version(request)
    
    # 버전별 사용량 메트릭 기록
    increment_counter('api.version.usage', tags={'version': version})
    
    # 폐기 예정 버전에 대한 경고 로깅
    if version in DEPRECATED_VERSIONS:
        log.warning(
            "Deprecated API version used",
            extra={
                'version': version,
                'client_id': get_client_id(request),
                'endpoint': request.path,
                'sunset_date': SUNSET_DATES.get(version, 'unknown')
            }
        )
        
        # 응답 헤더에 폐기 관련 정보 추가
        sunset_date = SUNSET_DATES.get(version)
        if sunset_date:
            response.headers['Sunset'] = sunset_date
            response.headers['Deprecation'] = 'true'
            response.headers['Link'] = f'<{MIGRATION_GUIDES[version]}>; rel="deprecation"; type="text/html"'
    
    return response

사용자 정의 건강 검사(Health Checks) 구현

API의 핵심 기능과 종속성을 주기적으로 검사하여 전체 시스템의 건강 상태를 모니터링한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Flask에서의 종합적인 건강 검사 API 구현
from flask import Flask, jsonify
import redis
import psycopg2
import requests
import time

app = Flask(__name__)

def check_database():
    """데이터베이스 연결 및 기본 쿼리 건강 검사"""
    try:
        conn = psycopg2.connect(DATABASE_URL)
        cursor = conn.cursor()
        cursor.execute("SELECT 1")  # 간단한 쿼리로 연결 확인
        cursor.close()
        conn.close()
        return True, "Database connection is healthy"
    except Exception as e:
        return False, f"Database connection failed: {str(e)}"

def check_redis():
    """Redis 연결 건강 검사"""
    try:
        redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
        redis_client.ping()  # PING 명령으로 연결 확인
        return True, "Redis connection is healthy"
    except Exception as e:
        return False, f"Redis connection failed: {str(e)}"

def check_external_api():
    """외부 의존성 API 건강 검사"""
    try:
        response = requests.get(
            "https://external-api.example.com/health",
            timeout=3  # 3초 타임아웃 설정
        )
        if response.status_code == 200:
            return True, "External API is reachable"
        else:
            return False, f"External API returned status code {response.status_code}"
    except Exception as e:
        return False, f"External API check failed: {str(e)}"

@app.route('/health')
def basic_health_check():
    """기본 가용성 확인을 위한 간단한 건강 검사"""
    return jsonify({"status": "UP"})

@app.route('/health/detailed')
def detailed_health_check():
    """상세한, 종합적인 건강 검사"""
    start_time = time.time()
    
    checks = {
        "database": check_database(),
        "redis": check_redis(),
        "external_api": check_external_api()
    }
    
    # 전체 상태 계산
    all_healthy = all(result[0] for result in checks.values())
    overall_status = "UP" if all_healthy else "DOWN"
    
    # 응답 형식화
    response = {
        "status": overall_status,
        "checks": {
            name: {
                "status": "UP" if result[0] else "DOWN",
                "message": result[1]
            }
            for name, result in checks.items()
        },
        "time": round((time.time() - start_time) * 1000, 2)  # ms 단위 응답 시간
    }
    
    status_code = 200 if all_healthy else 503  # 서비스 사용 불가 상태 코드
    return jsonify(response), status_code

API 모니터링의 도전 과제와 해결 방법

데이터 볼륨 관리

대규모 API는 방대한 양의 모니터링 데이터를 생성하며, 이를 효과적으로 관리해야 한다.

해결 방법:

  • 데이터 샘플링: 전체 트래픽의 일부만 상세히 추적
  • 집계 전략: 원시 데이터를 의미 있는 통계로 축소
  • TTL(Time-to-Live) 정책: 오래된 데이터 자동 삭제
  • 계층적 스토리지: 최신 데이터는 고속 스토리지에, 오래된 데이터는 저비용 스토리지에 보관
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 샘플링 구현 예시 (Node.js)
function shouldSample(requestId) {
  // 현재 샘플링 비율 (환경 변수나 설정에서 가져옴)
  const samplingRate = parseFloat(process.env.TRACING_SAMPLING_RATE || "0.1");  // 기본값 10%
  
  // requestId의 해시값을 계산하여 0-1 사이의 값으로 정규화
  const hash = calculateHash(requestId);
  const normalizedHash = hash / Number.MAX_SAFE_INTEGER;
  
  // 정규화된 해시값이 샘플링 비율보다 작으면 샘플링
  return normalizedHash < samplingRate;
}

// 미들웨어에서 사용
app.use((req, res, next) => {
  const requestId = req.headers['x-request-id'] || uuidv4();
  req.requestId = requestId;
  
  // 샘플링 여부 결정
  req.shouldTrace = shouldSample(requestId);
  
  // 모든 요청에 대해 기본 메트릭은 항상 수집
  collectBasicMetrics(req, res);
  
  // 샘플링된 요청에 대해서만 상세 추적 활성화
  if (req.shouldTrace) {
    enableDetailedTracing(req, res);
  }
  
  next();
});

보안 모니터링 통합

API 모니터링은 성능뿐만 아니라 보안 측면도 고려해야 한다.

해결 방법:

  • 비정상 패턴 감지: 비정상적인 요청 패턴 모니터링
  • 속도 제한 모니터링: 속도 제한 위반 추적
  • 인증 실패 모니터링: 인증 오류 패턴 분석
  • 데이터 유출 감지: 민감한 데이터 유출 모니터링
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# API 보안 모니터링 예시
def monitor_security_events(request, response):
    # 인증 실패 모니터링
    if response.status_code == 401:
        increment_counter('api.auth.failure', tags={
            'ip': request.remote_addr,
            'user_agent': request.user_agent.string,
            'endpoint': request.path
        })
        
        # 동일 IP에서의 인증 실패 횟수 확인
        failure_count = get_recent_failures(request.remote_addr)
        if failure_count > THRESHOLD_SUSPICIOUS_AUTH_FAILURES:
            log_security_event('excessive_auth_failures', {
                'ip': request.remote_addr,
                'count': failure_count,
                'time_window': 'last 1 hour'
            })
    
    # API 속도 제한 모니터링
    if response.status_code == 429:
        increment_counter('api.rate_limit.exceeded', tags={
            'client_id': get_client_id(request),
            'endpoint': request.path
        })
        
        # 속도 제한 초과 패턴 분석
        if is_abnormal_rate_pattern(get_client_id(request)):
            log_security_event('suspicious_rate_limit_pattern', {
                'client_id': get_client_id(request),
                'pattern': 'burst_traffic'
            })
    
    # 데이터 유출 의심 모니터링 (비정상적으로 큰 응답)
    if response.content_length > THRESHOLD_LARGE_RESPONSE:
        log_security_event('large_data_transfer', {
            'client_id': get_client_id(request),
            'endpoint': request.path,
            'size': response.content_length
        })

마이크로서비스 환경에서의 모니터링

마이크로서비스 아키텍처는 분산된 서비스의 모니터링 복잡성을 증가시킨다.

해결 방법:

  • 상관 ID 전파: 요청 추적을 위한 고유 식별자 전달
  • 서비스 맵 생성: 서비스 간 종속성 시각화
  • 분산 로깅: 중앙 집중식 로그 수집 및 분석
  • 서비스 메시: Istio, Linkerd 등의 서비스 메시를 통한 관찰성 향상
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 상관 ID 전파 예시 (Java/Spring)
@Component
public class CorrelationIdFilter extends OncePerRequestFilter {

    private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, 
                                   FilterChain filterChain) throws ServletException, IOException {
        String correlationId = request.getHeader(CORRELATION_ID_HEADER);
        
        // 상관 ID가 없으면 새로 생성
        if (correlationId == null || correlationId.isEmpty()) {
            correlationId = UUID.randomUUID().toString();
        }
        
        // MDC에 상관 ID 설정 (로깅용)
        MDC.put("correlationId", correlationId);
        
        // 응답 헤더에 상관 ID 추가
        response.addHeader(CORRELATION_ID_HEADER, correlationId);
        
        try {
            // 다운스트림 서비스 호출 시 상관 ID 전달을 위한 설정
            RequestContextHolder.currentRequestAttributes()
                .setAttribute("correlationId", correlationId, RequestAttributes.SCOPE_REQUEST);
            
            filterChain.doFilter(request, response);
        } finally {
            MDC.remove("correlationId");
        }
    }
}

// RestTemplate에 상관 ID 추가
@Configuration
public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(
            Collections.singletonList((request, body, execution) -> {
                String correlationId = (String) RequestContextHolder.currentRequestAttributes()
                    .getAttribute("correlationId", RequestAttributes.SCOPE_REQUEST);
                
                if (correlationId != null) {
                    request.getHeaders().add("X-Correlation-ID", correlationId);
                }
                
                return execution.execute(request, body);
            })
        );
        return restTemplate;
    }
}

API 모니터링 고급 기법

이상 탐지 및 예측 분석

정상적인 패턴에서 벗어난 비정상적인 API 동작을 자동으로 감지하고 미래 문제를 예측한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 이상 탐지 예시 (Python/SciPy)
from scipy import stats
import numpy as np

class AnomalyDetector:
    def __init__(self, window_size=60, threshold=3.0):
        self.window_size = window_size  # 분석할 이전 데이터 포인트 수
        self.threshold = threshold      # 이상 감지 z-score 임계값
        self.history = []               # 이전 데이터 포인트 저장
    
    def is_anomaly(self, value):
        """현재 값이 이상치인지 확인"""
        self.history.append(value)
        
        # 충분한 데이터가 없으면 이상으로 간주하지 않음
        if len(self.history) < self.window_size:
            return False
        
        # 윈도우 크기 유지
        if len(self.history) > self.window_size:
            self.history.pop(0)
        
        # z-score 계산
        mean = np.mean(self.history[:-1])  # 현재 값을 제외한 평균
        std = np.std(self.history[:-1])    # 현재 값을 제외한 표준편차
        
        # 표준편차가 0에 가까우면 일정한 값이므로 이상이 아님
        if std < 1e-10:
            return False
        
        z_score = abs((value - mean) / std)
        
        return z_score > self.threshold

# 사용 예시
response_time_detector = AnomalyDetector(window_size=100, threshold=3.5)

def monitor_api_response_time(endpoint, response_time):
    if response_time_detector.is_anomaly(response_time):
        log.warning(
            f"Anomalous response time detected for {endpoint}: {response_time}ms "
            f"(threshold: {response_time_detector.threshold} standard deviations)"
        )
        
        # 알림 전송
        send_alert(
            title=f"Anomalous API Response Time: {endpoint}",
            message=f"Response time of {response_time}ms is abnormally high.",
            severity="warning"
        )

지능형 알림 전략

컨텍스트 인식 알림과 자동화된 문제 해결 방법을 구현한다.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
# 지능형 알림 시스템 예시
class SmartAlertSystem:
    def __init__(self):
        # 알림 상태 저장
        self.active_alerts = {}
        # 알림 에스컬레이션 상태
        self.escalations = {}
        # 자동 해결 동작
        self.auto_remediation_actions = {
            'high_error_rate': self.restart_service,
            'memory_leak': self.increase_memory_limit,
            'database_connection_issues': self.check_db_connections
        }
    
    def process_alert(self, alert_type, severity, context):
        """알림 처리 및 지능형 라우팅"""
        alert_key = f"{alert_type}:{context.get('service', 'unknown')}"
        
        # 이미 활성화된 알림인지 확인
        if alert_key in self.active_alerts:
            # 기존 알림 상태 업데이트
            self.active_alerts[alert_key]['count'] += 1
            self.active_alerts[alert_key]['last_seen'] = time.time()
            
            # 알림이 지속되면 에스컬레이션
            duration = time.time() - self.active_alerts[alert_key]['first_seen']
            if duration > 900 and alert_key not in self.escalations:  # 15분 이상 지속
                self.escalate_alert(alert_key, context)
        else:
            # 새로운 알림 생성
            self.active_alerts[alert_key] = {
                'type': alert_type,
                'severity': severity,
                'context': context,
                'count': 1,
                'first_seen': time.time(),
                'last_seen': time.time()
            }
            
            # 즉시 알림
            self.send_initial_alert(alert_key)
            
            # 자동 해결 시도
            self.attempt_auto_remediation(alert_type, context)
    
    def escalate_alert(self, alert_key, context):
        """알림 에스컬레이션 - 더 높은 우선순위로 라우팅"""
        alert = self.active_alerts[alert_key]
        
        # 에스컬레이션 상태 저장
        self.escalations[alert_key] = {
            'time': time.time(),
            'previous_severity': alert['severity']
        }
        
        # 심각도 상향 조정
        new_severity = 'critical' if alert['severity'] == 'warning' else 'emergency'
        alert['severity'] = new_severity
        
        # 상향된 심각도로 알림 전송
        message = (
            f"ESCALATED ALERT: {alert['type']} issue has been ongoing for "
            f"{int((time.time() - alert['first_seen']) / 60)} minutes. "
            f"Occurred {alert['count']} times."
        )
        
        # 에스컬레이션 경로에 따라 알림 전송
        self.send_escalated_alert(alert_key, message, new_severity)
    
    def attempt_auto_remediation(self, alert_type, context):
        """자동 문제 해결 시도"""
        if alert_type in self.auto_remediation_actions:
            try:
                result = self.auto_remediation_actions[alert_type](context)
                
                if result['success']:
                    log.info(
                        f"Auto-remediation for {alert_type} successful: {result['message']}"
                    )
                else:
                    log.warning(
                        f"Auto-remediation for {alert_type} failed: {result['message']}"
                    )
            except Exception as e:
                log.error(f"Error during auto-remediation for {alert_type}: {str(e)}")
    
    # 자동 해결 동작 구현
    def restart_service(self, context):
        service_name = context.get('service')
        # 서비스 재시작 로직…
        return {'success': True, 'message': f"Service {service_name} restarted successfully"}
    
    def increase_memory_limit(self, context):
        service_name = context.get('service')
        # 메모리 한도 증가 로직…
        return {'success': True, 'message': f"Memory limit for {service_name} increased"}
    
    def check_db_connections(self, context):
        # 데이터베이스 연결 풀 확인 및 리셋 로직…
        return {'success': True, 'message': "Database connection pool reset"}

사용자 행동 분석

API 사용 패턴을 분석하여 사용자 행동과 요구사항을 더 잘 이해한다.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// API 사용자 행동 분석 예시 (Node.js)
class APIUsageAnalytics {
  constructor() {
    this.sessionData = {};  // 세션별 데이터 저장
    this.userPatterns = {}; // 사용자별 패턴 저장
  }
  
  trackRequest(userId, sessionId, endpoint, method, timestamp) {
    // 세션 데이터 초기화
    if (!this.sessionData[sessionId]) {
      this.sessionData[sessionId] = {
        userId,
        startTime: timestamp,
        lastActivity: timestamp,
        requests: [],
        endpoints: new Set()
      };
    }
    
    // 요청 추적
    this.sessionData[sessionId].requests.push({
      endpoint,
      method,
      timestamp
    });
    
    this.sessionData[sessionId].endpoints.add(endpoint);
    this.sessionData[sessionId].lastActivity = timestamp;
    
    // 사용자 패턴 업데이트
    if (!this.userPatterns[userId]) {
      this.userPatterns[userId] = {
        frequentEndpoints: {},
        typicalSessionLength: [],
        commonFlows: {}
      };
    }
    
    // 엔드포인트 사용 빈도 추적
    const endpoints = this.userPatterns[userId].frequentEndpoints;
    endpoints[endpoint] = (endpoints[endpoint] || 0) + 1;
    
    // 흐름 분석 (순차적 API 호출 패턴)
    const session = this.sessionData[sessionId];
    if (session.requests.length >= 2) {
      const prevRequest = session.requests[session.requests.length - 2];
      const flow = `${prevRequest.endpoint} -> ${endpoint}`;
      
      const flows = this.userPatterns[userId].commonFlows;
      flows[flow] = (flows[flow] || 0) + 1;
    }
  }
  
  analyzeSession(sessionId) {
    const session = this.sessionData[sessionId];
    if (!session) return null;
    
    const sessionLength = session.lastActivity - session.startTime;
    const userId = session.userId;
    
    // 세션 길이 추적
    this.userPatterns[userId].typicalSessionLength.push(sessionLength);
    
    // 세션 분석 결과 반환
    return {
      userId,
      duration: sessionLength,
      requestCount: session.requests.length,
      uniqueEndpoints: session.endpoints.size,
      mostFrequentEndpoint: this.getMostFrequent(
        session.requests.map(r => r.endpoint)
      )
    };
  }
  
  getUserInsights(userId) {
    const patterns = this.userPatterns[userId];
    if (!patterns) return null;
    
    // 가장 자주 사용하는 엔드포인트
    const frequentEndpoints = Object.entries(patterns.frequentEndpoints)
      .sort((a, b) => b[1] - a[1])
      .slice(0, 5);
    
    // 평균 세션 길이
    const avgSessionLength = patterns.typicalSessionLength.length > 0
      ? patterns.typicalSessionLength.reduce((sum, len) => sum + len, 0) / patterns.typicalSessionLength.length
      : 0;
    
    // 일반적인 API 사용 흐름
    const commonFlows = Object.entries(patterns.commonFlows)
      .sort((a, b) => b[1] - a[1])
      .slice(0, 5);
    
    return {
      userId,
      topEndpoints: frequentEndpoints,
      averageSessionLength: avgSessionLength,
      typicalFlows: commonFlows
    };
  }
  
  getMostFrequent(arr) {
    const counts = {};
    let maxItem = null;
    let maxCount = 0;
    
    for (const item of arr) {
      counts[item] = (counts[item] || 0) + 1;
      if (counts[item] > maxCount) {
        maxItem = item;
        maxCount = counts[item];
      }
    }
    
    return maxItem;
  }
}

API 모니터링을 위한 조직 문화 및 프로세스

관찰성 문화 구축

모니터링에 그치지 않고 더 넓은 관찰성(Observability) 문화를 조직에 정착시킨다.

핵심 원칙:

  • 관찰성 중심 설계: API 설계 단계부터 관찰성 고려
  • 로그, 메트릭, 트레이스의 통합: “3대 관찰성 기둥” 통합 접근법
  • 가시성 민주화: 모든 팀원이 모니터링 도구에 접근 가능하게 함
  • 지속적 개선: 인시던트에서 배운 교훈을 통해 모니터링 전략 개선

SRE(Site Reliability Engineering) 원칙 적용

Google의 SRE 원칙을 API 모니터링에 적용한다.

주요 개념:

  • 서비스 수준 목표(SLO) 정의: 명확한 성능 및 가용성 목표 설정
  • 오류 예산: 허용 가능한 서비스 중단 또는 성능 저하의 한도 설정
  • 점진적 알림: 심각도에 따른 단계적 알림 전략
  • 포스트모템 문화: 장애 후 비난 없는 분석 및 학습
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# SLO 정의 예시 (yaml 형식)
service: user-api
slos:
  - name: availability
    target: 99.9%  # 월간 목표 가용성
    metric: api_success_rate
    measurement: ratio of successful to total requests
    window: 30d  # 30일 측정 기간
    
  - name: latency
    target: 95%  # 요청의 95%가 임계값 이내여야 함
    metric: api_request_duration_ms
    measurement: percentage of requests under threshold
    threshold: 300ms  # 300밀리초 임계값
    window: 30d  # 30일 측정 기간

error_budget:
  policy:
    # 오류 예산의 50%를 소진하면 릴리스 속도 줄이기
    - consumption: 50%
      action: "Reduce release velocity by 50%"
      
    # 오류 예산의 75%를 소진하면 예정된 기능 릴리스 중단
    - consumption: 75%
      action: "Pause feature releases, focus on reliability improvements"
      
    # 오류 예산의 90%를 소진하면 긴급 대응 모드 진입
    - consumption: 90%
      action: "Enter emergency response mode, all teams focus on reliability"

3. DevOps 통합 및 자동화

CI/CD 파이프라인에 모니터링 및 알림을 통합하여 개발과 운영 사이의 격차를 줄인다.

핵심 전략:

  1. 배포 전 성능 테스트: 자동화된 성능 테스트를, 빌드 프로세스에 통합
  2. 카나리 배포: 소규모 사용자 그룹에 새 버전을 점진적으로 출시하며 모니터링
  3. 자동 롤백: 성능 저하 시 자동으로 이전 버전으로 롤백
  4. 인프라스트럭처 코드화: 모니터링 구성도 코드화하여 버전 관리
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# GitHub Actions를 사용한 API 모니터링 CI/CD 통합 예시
name: API Deploy and Monitor

on:
  push:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      
      - name: Setup environment
        run: docker-compose up -d
      
      - name: Run unit and integration tests
        run: npm test
      
      - name: Performance baseline test
        run: |
          npm run perf-test
          node scripts/analyze-performance.js --fail-threshold=10
  
  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      
      - name: Deploy to 10% of production (canary)
        run: ./deploy.sh --environment=production --canary=10
      
      - name: Monitor canary deployment
        run: |
          # 15분 동안 카나리 배포 모니터링
          ./monitor-deployment.sh --duration=15m --environment=canary
      
      - name: Check error budget consumption
        run: |
          ERROR_BUDGET_CONSUMPTION=$(curl -s https://monitoring-api.example.com/error-budget)
          if [ $ERROR_BUDGET_CONSUMPTION -gt 80 ]; then
            echo "Error budget consumption too high: $ERROR_BUDGET_CONSUMPTION%"
            ./rollback.sh --environment=production
            exit 1
          fi
      
      - name: Complete rollout to production
        if: success()
        run: ./deploy.sh --environment=production --complete-rollout
      
      - name: Configure monitoring alerts
        run: |
          # 새로운 배포에 대한 모니터링 알림 설정
          ./configure-alerts.sh --version=$(git rev-parse --short HEAD)

용어 정리

용어설명
가용성(Availability)IT와 소프트웨어 시스템에서 서비스, 시스템, 또는 애플리케이션이 사용자에게 정상적으로 접근 가능하고 동작 가능한 상태를 유지하는 능력을 의미한다. 이는 시스템의 신뢰성과 안정성을 나타내는 중요한 지표 중 하나로, 특히 클라우드 컴퓨팅, 네트워크, 데이터센터, 웹 애플리케이션 등에서 핵심적인 성능 척도로 사용된다.
- 공식적인 정의:
가용성은 특정 시간 동안 시스템이 정상적으로 작동하여 사용자 요청을 처리할 수 있는 비율을 나타낸다.
수식:
가용성 (%)=(정상 작동 시간 / 전체 시간)×100
- 예시:
- 하루(24시간) 중 시스템이 23시간 55분 동안 정상 작동했다면:
가용성 (%)=(1435 / 1440)×100=99.65%
가용성의 중요성
1. 사용자 경험 개선:
- 높은 가용성은 서비스 중단 없이 사용자에게 안정적인 경험을 제공한다.
- 예: 전자상거래 사이트가 항상 접근 가능해야 고객이 구매를 완료할 수 있음.
2. 비즈니스 연속성 보장:
- 서비스 중단은 매출 손실과 고객 신뢰도 저하로 이어질 수 있다.
- 예: 금융 서비스나 의료 시스템에서 가용성 부족은 심각한 문제를 초래할 수 있음.
3. 서비스 수준 계약(SLA) 준수:
- 서비스 제공자는 SLA에서 명시된 가용성을 보장해야 하며, 이를 충족하지 못하면 페널티를 지불해야 할 수도 있다.

참고 및 출처