AI Engineering

AI 엔지니어링(Artificial Intelligence Engineering)은 인공지능 시스템을 설계, 개발, 배포 및 유지보수하는 체계적인 접근 방식이다. 이는 단순히 AI 모델을 훈련하는 것을 넘어서, 실제 비즈니스 환경에서 안정적으로 작동하는 AI 솔루션을 구축하는 전체 과정을 포함한다.

기존의 소프트웨어 엔지니어링이 소프트웨어 시스템을 개발하는 원칙과 방법론을 다룬다면, AI 엔지니어링은 여기에 데이터 관리, 모델 훈련, 불확실성 처리, 지속적인 학습 등 AI 시스템 특유의 측면을 추가한다.
AI 엔지니어링은 실험실에서 개발된 모델이 실제 환경에서 가치를 창출할 수 있도록 다리를 놓는 역할을 한다.

AI 엔지니어링은 연구 환경에서 개발된 AI 모델을 실제 비즈니스 환경에서 가치를 창출하는 운영 시스템으로 전환하는 핵심 분야이다. 이는 데이터 엔지니어링, 모델 개발, MLOps, 모니터링 및 피드백 루프를 포함하는 종합적인 접근 방식을 필요로 한다.

AI 엔지니어링의 필요성

AI 시스템은 전통적인 소프트웨어 시스템과 몇 가지 중요한 차이점이 있다:

  1. 데이터 중심: AI 시스템은 코드뿐만 아니라 데이터에 의해 동작이 결정된다.
  2. 비결정적 동작: 같은 입력에 대해 항상 동일한 출력을 보장하지 않을 수 있다.
  3. 지속적 학습 필요: 시간이 지남에 따라 데이터가 변하면 모델도 업데이트되어야 한다.
  4. 해석과 설명 과제: 모델의 결정을 이해하고 설명하는 것이 어려울 수 있다.

이러한 특성으로 인해, AI 시스템을 개발하고 운영하기 위한 별도의 엔지니어링 접근 방식이 필요하게 되었다.

AI 엔지니어링의 핵심 구성 요소

AI 엔지니어링은 다음과 같은 핵심 영역으로 구성된다:

데이터 엔지니어링

AI 시스템의 기반이 되는 데이터를 수집, 처리, 저장하는 과정을 관리한다.

  • 데이터 수집: 다양한 소스에서 데이터를 수집하는 파이프라인 구축
  • 데이터 전처리: 결측치 처리, 정규화, 특성 추출 등의 과정 자동화
  • 데이터 저장 및 관리: 효율적인 데이터 저장소 설계 및 버전 관리
  • 데이터 품질 관리: 일관성, 정확성, 완전성 등 데이터 품질 보장

예시 코드 - 데이터 파이프라인 구축:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Apache Airflow를 사용한 데이터 수집 파이프라인 예시
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# 기본 인자 설정
default_args = {
    'owner': 'ai_engineer',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
dag = DAG(
    'data_collection_pipeline',
    default_args=default_args,
    description='데이터 수집 및 전처리 파이프라인',
    schedule_interval=timedelta(days=1),
)

# 데이터 수집 함수
def collect_data(**context):
    # 데이터 소스에서 데이터 수집 로직
    print("데이터 수집 중…")
    # 수집된 데이터 반환

# 데이터 전처리 함수
def preprocess_data(**context):
    # 데이터 전처리 로직
    print("데이터 전처리 중…")
    # 전처리된 데이터 반환

# 데이터 검증 함수
def validate_data(**context):
    # 데이터 품질 검증 로직
    print("데이터 검증 중…")
    # 검증 결과 반환

# 태스크 생성 및 연결
collect_task = PythonOperator(
    task_id='collect_data',
    python_callable=collect_data,
    provide_context=True,
    dag=dag,
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    provide_context=True,
    dag=dag,
)

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    provide_context=True,
    dag=dag,
)

# 태스크 의존성 설정
collect_task >> preprocess_task >> validate_task

모델 개발 및 훈련

AI 모델을 설계하고 최적화하는 과정을 관리한다.

  • 모델 설계: 문제에 적합한 알고리즘 및 아키텍처 선택
  • 하이퍼파라미터 최적화: 최적의 모델 파라미터 탐색
  • 모델 평가: 정확성, 견고성, 공정성 등 다양한 측면에서 모델 평가
  • 실험 관리: 모델 실험 추적 및 재현성 보장

예시 코드 - 모델 훈련 및 실험 추적:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# PyTorch와 MLflow를 사용한 모델 훈련 및 실험 추적 예시
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import mlflow
import mlflow.pytorch

# 모델 정의
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleNN, self).__init__()
        self.layer1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.layer2(x)
        return x

# 훈련 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs):
    # MLflow 실험 시작
    mlflow.start_run()
    
    # 하이퍼파라미터 로깅
    mlflow.log_param("hidden_size", model.layer1.out_features)
    mlflow.log_param("learning_rate", optimizer.param_groups[0]['lr'])
    mlflow.log_param("epochs", epochs)
    
    for epoch in range(epochs):
        # 훈련 모드
        model.train()
        train_loss = 0
        for inputs, targets in train_loader:
            # 순전파
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # 역전파 및 최적화
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 평가 모드
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()
                
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()
        
        # 메트릭 계산 및 로깅
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        accuracy = 100. * correct / total
        
        mlflow.log_metric("train_loss", train_loss, step=epoch)
        mlflow.log_metric("val_loss", val_loss, step=epoch)
        mlflow.log_metric("accuracy", accuracy, step=epoch)
        
        print(f"Epoch {epoch+1}/{epochs}: "
              f"Train Loss: {train_loss:f}, "
              f"Val Loss: {val_loss:f}, "
              f"Accuracy: {accuracy:f}%")
    
    # 모델 저장
    mlflow.pytorch.log_model(model, "model")
    mlflow.end_run()
    
    return model

# 모델 훈련 실행
input_size = 784  # MNIST 이미지 크기
hidden_size = 128
output_size = 10  # 0-9 숫자 분류
model = SimpleNN(input_size, hidden_size, output_size)

# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 데이터 로더 설정 (실제 코드에서는 실제 데이터 사용)
# train_loader = DataLoader(…)
# val_loader = DataLoader(…)

# 모델 훈련
# trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)

모델 배포 및 제공

개발된 AI 모델을 실제 환경에 배포하고 서비스하는 과정을 관리한다.

  • 모델 패키징: 모델과 의존성을 함께 패키징하여 이식성 확보
  • 서빙 인프라: 모델 서빙을 위한 확장 가능한 인프라 구축
  • API 설계: 모델과 상호작용하기 위한 인터페이스 설계
  • 성능 최적화: 지연 시간 및 처리량 최적화

예시 코드 - 모델 배포:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Flask와 Docker를 사용한 모델 배포 예시

# app.py
from flask import Flask, request, jsonify
import torch
import torch.nn as nn
import numpy as np

app = Flask(__name__)

# 모델 정의 (실제 배포에서는 저장된 모델을 로드)
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleNN, self).__init__()
        self.layer1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.layer2(x)
        return x

# 모델 로드
def load_model():
    model = SimpleNN(784, 128, 10)
    model.load_state_dict(torch.load('model.pth'))
    model.eval()
    return model

model = load_model()

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # 요청에서 데이터 추출
        data = request.json
        input_data = np.array(data['input'])
        
        # 입력 데이터 전처리
        input_tensor = torch.FloatTensor(input_data)
        
        # 예측 수행
        with torch.no_grad():
            output = model(input_tensor)
            probabilities = torch.nn.functional.softmax(output, dim=0)
            predicted_class = torch.argmax(probabilities).item()
        
        # 결과 반환
        return jsonify({
            'predicted_class': predicted_class,
            'probabilities': probabilities.tolist()
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 5000

CMD ["python", "app.py"]

MLOps 및 자동화

AI 모델의 개발, 테스트, 배포, 모니터링 과정을 자동화하는 CI/CD 파이프라인을 구축한다.

  • 지속적 통합(CI): 코드 및 데이터 변경 시 자동 테스트
  • 지속적 배포(CD): 검증된 모델을 자동으로 배포
  • 모델 버전 관리: 모델과 데이터의 버전 및 계보 추적
  • 인프라스트럭처 as 코드(IaC): 인프라 설정을 코드로 관리

예시 코드 - CI/CD 파이프라인:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# GitHub Actions를 사용한 CI/CD 파이프라인 예시
# .github/workflows/model-pipeline.yml

name: Model Training and Deployment Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Run tests
      run: |
        pytest tests/

  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Train model
      run: |
        python scripts/train.py
    - name: Evaluate model
      run: |
        python scripts/evaluate.py
    - name: Upload model artifact
      uses: actions/upload-artifact@v2
      with:
        name: model-artifact
        path: models/model.pth

  deploy:
    needs: train
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Download model artifact
      uses: actions/download-artifact@v2
      with:
        name: model-artifact
        path: models/
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v1
    - name: Login to DockerHub
      uses: docker/login-action@v1
      with:
        username: ${{ secrets.DOCKERHUB_USERNAME }}
        password: ${{ secrets.DOCKERHUB_TOKEN }}
    - name: Build and push
      uses: docker/build-push-action@v2
      with:
        context: .
        push: true
        tags: user/model-api:latest

모델 모니터링 및 피드백 루프

배포된 모델의 성능을 지속적으로 모니터링하고 필요에 따라 업데이트하는 과정을 관리한다.

  • 성능 모니터링: 정확도, 예측 지연 시간 등의 측정 및 추적
  • 데이터 드리프트 감지: 입력 데이터 변화 감지 및 대응
  • 개념 드리프트 감지: 데이터와 대상 간 관계 변화 감지
  • A/B 테스트: 새 모델과 기존 모델 비교 실험

예시 코드 - 모델 모니터링:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Prometheus와 Grafana를 사용한 모델 모니터링 예시
from flask import Flask, request, jsonify
import torch
import numpy as np
from prometheus_client import Counter, Histogram, Gauge, Summary, start_http_server
import time

app = Flask(__name__)

# Prometheus 메트릭 정의
PREDICTION_COUNT = Counter('model_predictions_total', 'Total number of predictions')
PREDICTION_LATENCY = Histogram('model_prediction_latency_seconds', 'Prediction latency in seconds')
PREDICTION_ERRORS = Counter('model_prediction_errors_total', 'Total number of prediction errors')
MODEL_ACCURACY = Gauge('model_accuracy', 'Current model accuracy')
INPUT_DRIFT = Gauge('input_drift_score', 'Input data drift score')

# 모델 로드 및 설정
# (이전 예시 참조)

@app.route('/predict', methods=['POST'])
def predict():
    start_time = time.time()
    
    try:
        # 요청에서 데이터 추출
        data = request.json
        input_data = np.array(data['input'])
        
        # 입력 데이터 드리프트 감지
        drift_score = check_data_drift(input_data)
        INPUT_DRIFT.set(drift_score)
        
        # 예측 수행
        with torch.no_grad():
            input_tensor = torch.FloatTensor(input_data)
            output = model(input_tensor)
            probabilities = torch.nn.functional.softmax(output, dim=0)
            predicted_class = torch.argmax(probabilities).item()
        
        # 메트릭 업데이트
        PREDICTION_COUNT.inc()
        
        # 결과 반환
        response = {
            'predicted_class': predicted_class,
            'probabilities': probabilities.tolist()
        }
        
        return jsonify(response)
    
    except Exception as e:
        PREDICTION_ERRORS.inc()
        return jsonify({'error': str(e)}), 400
    
    finally:
        # 예측 지연 시간 측정
        latency = time.time() - start_time
        PREDICTION_LATENCY.observe(latency)

def check_data_drift(input_data):
    # 입력 데이터 드리프트 감지 로직
    # 기준 데이터와 현재 데이터의 통계적 특성 비교
    # 간단한 예시로, 평균 차이를 기반으로 점수 계산
    reference_mean = 0.5  # 기준 데이터의 평균 (예시 값)
    current_mean = np.mean(input_data)
    
    drift_score = abs(current_mean - reference_mean)
    return drift_score

# 모델 정확도 업데이트 함수 (주기적으로 호출)
def update_model_accuracy():
    # 테스트 데이터로 모델 정확도 평가
    accuracy = evaluate_model_on_test_data()
    MODEL_ACCURACY.set(accuracy)

# 메트릭 서버 시작
start_http_server(8000)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

AI 엔지니어링의 필수 기술과 도구

  1. 프로그래밍 언어 및 프레임워크:
    AI 엔지니어링에 필요한 주요 프로그래밍 언어와 프레임워크:

    • Python: AI 개발의 사실상 표준 언어
    • R: 통계 분석 및 시각화에 강점
    • Julia: 고성능 연산에 최적화된 언어
    • TensorFlow, PyTorch: 딥러닝 모델 개발에 사용되는 주요 프레임워크
    • scikit-learn: 전통적인 머신러닝 알고리즘 구현
    • Spark MLlib: 분산 머신러닝을 위한 라이브러리
  2. 데이터 엔지니어링 도구
    데이터 처리 및 관리를 위한 도구:

    • SQL, NoSQL 데이터베이스: 데이터 저장 및 쿼리
    • Apache Spark, Hadoop: 대규모 데이터 처리
    • Kafka, RabbitMQ: 데이터 스트리밍 및 메시징
    • Airflow, Luigi: 데이터 파이프라인 오케스트레이션
    • dbt (data build tool): 데이터 변환 및 관리
  3. MLOps 도구
    AI 시스템의 개발 및 운영을 위한 도구:

    • Git, GitHub: 코드 버전 관리
    • DVC (Data Version Control): 데이터 및 모델 버전 관리
    • MLflow, Weights & Biases: 실험 추적 및 모델 관리
    • Kubeflow, Metaflow: ML 워크플로우 오케스트레이션
    • Docker, Kubernetes: 컨테이너화 및 오케스트레이션
    • TensorFlow Serving, TorchServe, KFServing: 모델 서빙
    • Prometheus, Grafana: 모니터링 및 알림
  4. 클라우드 플랫폼 및 서비스
    클라우드 기반 AI 서비스:

    • AWS SageMaker: 모델 개발, 훈련, 배포를 위한 통합 플랫폼
    • Google AI Platform: 모델 개발 및 배포를 위한 Google의 서비스
    • Azure Machine Learning: Microsoft의 ML 개발 및 배포 서비스
    • IBM Watson: 다양한 AI 서비스를 제공하는 IBM의 플랫폼
    • Databricks: 협업 기반 데이터 분석 및 ML 플랫폼

AI 엔지니어링 프로세스

  1. 문제 정의 및 요구사항 분석
    AI 프로젝트의 출발점은 명확한 문제 정의와 요구사항 분석이다:
    1. 비즈니스 문제 이해: 해결하려는 문제와 비즈니스 목표 파악
    2. 요구사항 분석: 기능적, 비기능적 요구사항 정의
    3. 성공 기준 설정: 모델 성능 및 비즈니스 성과 지표 정의
    4. 제약 조건 식별: 리소스, 시간, 규제 등의 제약 확인
  2. 데이터 수집 및 준비
    AI 모델 개발에 필요한 데이터를 수집하고 준비하는 단계:
    1. 데이터 소스 식별: 관련 데이터 소스 식별 및 접근 방법 수립
    2. 데이터 수집: 다양한 소스에서 데이터 수집 및 통합
    3. 데이터 탐색: 데이터 특성, 분포, 품질 등 분석
    4. 데이터 전처리: 결측치 처리, 정규화, 특성 추출 등 수행
    5. 데이터 분할: 훈련, 검증, 테스트 세트로 데이터 분할
  3. 모델 개발 및 훈련
    AI 모델을 설계하고 훈련하는 단계:
    1. 모델 선택: 문제에 적합한 알고리즘 및 아키텍처 선택
    2. 초기 모델 훈련: 기본 설정으로 모델 훈련 및 평가
    3. 하이퍼파라미터 최적화: 모델 성능 향상을 위한 파라미터 튜닝
    4. 모델 평가: 다양한 메트릭으로 모델 성능 평가
    5. 모델 재훈련: 최적의 설정으로 전체 훈련 데이터셋에서 모델 재훈련
  4. 모델 배포 및 통합
    개발된 모델을 실제 환경에 배포하고 기존 시스템과 통합하는 단계:
    1. 배포 전략 수립: 온라인/오프라인, 배치/실시간 등 배포 방식 결정
    2. 모델 패키징: 모델과 의존성을 함께 패키징
    3. 서빙 인프라 구축: 확장성, 가용성, 성능을 고려한 인프라 설계
    4. API 개발: 모델과 상호작용하기 위한 인터페이스 개발
    5. 시스템 통합: 기존 시스템과의 통합 및 테스트
  5. 모니터링 및 유지보수
    배포된 모델의 성능을 모니터링하고 유지보수하는 단계:
    1. 성능 모니터링: 모델 정확도, 응답 시간 등 추적
    2. 데이터 드리프트 감지: 입력 데이터 변화 모니터링
    3. 개념 드리프트 감지: 데이터와 목표 변수 간 관계 변화 감지
    4. 모델 업데이트: 필요에 따라 모델 재훈련 및 업데이트
    5. 피드백 수집: 사용자 피드백 수집 및 분석

AI 엔지니어링의 도전 과제 및 모범 사례

  1. 데이터 관련 도전 과제

    • 불충분한 데이터: 모델 훈련에 필요한 데이터 부족
    • 편향된 데이터: 특정 그룹이나 상황을 과소/과대 표현하는 데이터
    • 낮은 데이터 품질: 결측치, 잡음, 불일치 등의 문제
    • 데이터 라벨링 비용: 지도학습을 위한 라벨링의 높은 비용과 시간
      모범 사례:
    • 데이터 증강(Data Augmentation) 기법 활용
    • 다양성과 공정성을 고려한 데이터 수집
    • 자동화된 데이터 품질 검증 파이프라인 구축
    • 준지도학습, 약지도학습 등의 대안적 접근법 고려
  2. 모델 개발 도전 과제

    • 과적합(Overfitting): 훈련 데이터에만 최적화된 모델
    • 모델 복잡성: 해석하기 어려운 블랙박스 모델
    • 일반화 능력: 새로운 데이터에 대한 성능 저하
    • 컴퓨팅 요구사항: 대규모 모델 훈련의 높은 컴퓨팅 요구
      모범 사례:
    • 정규화, 교차 검증 등 과적합 방지 기법 적용
    • 모델 설명 가능성(XAI) 기법 활용
    • 다양한 조건에서의 강건성 테스트
    • 모델 압축, 증류, 양자화 등의 최적화 기법 적용
  3. 배포 및 운영 도전 과제

    • 모델-서빙 간극: 연구 환경과 생산 환경 간의 차이로 인한 문제
    • 확장성: 트래픽 증가에 따른 확장 문제
    • 지연 시간: 실시간 응답이 필요한 애플리케이션의 성능 요구사항
    • 의존성 관리: 모델과 환경 의존성의 복잡성
      모범 사례:
    • 컨테이너화를 통한 환경 일관성 확보
    • 자동 스케일링 인프라 구축
    • 모델 최적화 및 캐싱 전략 도입
    • 의존성을 포함한 모델 패키징 및 버전 관리
  4. 모니터링 및 유지보수 도전 과제

    • 성능 저하 감지: 시간이 지남에 따른 모델 성능 저하 감지
    • 데이터 드리프트: 입력 데이터 분포의 변화 관리
    • 개념 드리프트: 데이터와 목표 변수 간 관계 변화 관리
    • 피드백 루프: 모델 업데이트를 위한 피드백 메커니즘 구축
      모범 사례:
    • 자동화된 모델 성능 모니터링 시스템 구축
    • 데이터 드리프트 감지 및 알림 메커니즘 구현
    • 정기적인 모델 재평가 및 재훈련 일정 수립
    • A/B 테스트를 통한 모델 업데이트 효과 검증
  5. 윤리 및 책임 AI 도전 과제

    • 편향과 공정성: 모델의 결정에서 특정 그룹에 대한 편향
    • 설명 가능성: 모델 결정에 대한 투명성 및 해석 가능성 부족
    • 프라이버시: 개인 정보 보호 및 데이터 보안 문제
    • 사회적 영향: AI 시스템의 잠재적 부정적 사회 영향
      모범 사례:
    • 공정성 메트릭을 통한 모델 평가 및 편향 감지
    • 설명 가능한 AI(XAI) 기법 및 도구 활용
    • 차등 프라이버시, 연합 학습 등의 프라이버시 보호 기법 적용
    • 다양한 이해관계자와의 협력 및 윤리적 영향 평가

AI 엔지니어링 프로젝트 예시

고객 이탈 예측 시스템

통신사나 구독 서비스 회사가 고객 이탈(Churn)을 예측하고 방지하기 위한 AI 시스템 구축 과정을 단계별로 살펴보자.

문제 정의 및 요구사항
  • 비즈니스 문제: 고객 이탈로 인한 수익 손실 최소화
  • AI 접근법: 고객 이탈 위험이 높은 고객을 식별하는 예측 모델
  • 성공 기준: 이탈 예측 정확도 85% 이상, 고객 유지율 10% 향상
  • 제약 조건: 일별 예측 업데이트, 90초 이내 마케팅 시스템 통합
데이터 파이프라인 구축
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# 데이터 파이프라인 예시 코드 (Airflow 사용)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import logging

default_args = {
    'owner': 'ai_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'customer_churn_data_pipeline',
    default_args=default_args,
    description='Customer churn data processing pipeline',
    schedule_interval='@daily',
)

def extract_customer_data(**kwargs):
    """고객 데이터 추출 함수"""
    try:
        # 데이터베이스에서 고객 데이터 추출
        # 실제 구현에서는 SQL 쿼리 또는 API 호출 사용
        logging.info("Extracting customer data...")
        
        # 예시 데이터 (실제로는 DB에서 가져옴)
        customers = pd.DataFrame({
            'customer_id': range(1000, 1100),
            'subscription_length': np.random.randint(1, 60, 100),
            'monthly_charges': np.random.uniform(20, 200, 100),
            'total_charges': np.random.uniform(100, 5000, 100),
            'contract_type': np.random.choice(['Monthly', 'One year', 'Two year'], 100),
            'payment_method': np.random.choice(['Credit card', 'Bank transfer', 'Electronic check'], 100),
            'tech_support_calls': np.random.randint(0, 15, 100),
            'last_active_days': np.random.randint(0, 30, 100),
        })
        
        # 결과 저장
        customers.to_csv('/tmp/raw_customer_data.csv', index=False)
        return '/tmp/raw_customer_data.csv'
    
    except Exception as e:
        logging.error(f"Error extracting customer data: {e}")
        raise

def validate_data(**kwargs):
    """데이터 검증 함수"""
    try:
        input_path = kwargs['ti'].xcom_pull(task_ids='extract_customer_data')
        logging.info(f"Validating data from {input_path}")
        
        # 데이터 로드
        df = pd.read_csv(input_path)
        
        # 데이터 검증 로직
        # 1. 필수 열 존재 확인
        required_columns = ['customer_id', 'subscription_length', 'monthly_charges']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")
        
        # 2. 중복 고객 ID 확인
        if df['customer_id'].duplicated().any():
            raise ValueError("Duplicate customer IDs found")
        
        # 3. 데이터 범위 확인
        if (df['monthly_charges'] < 0).any():
            raise ValueError("Negative monthly charges found")
        
        # 4. 결측치 확인
        missing_values = df.isnull().sum()
        if missing_values.sum() > 0:
            logging.warning(f"Missing values found: {missing_values[missing_values > 0]}")
        
        # 검증 통과 - 데이터 저장
        df.to_csv('/tmp/validated_customer_data.csv', index=False)
        return '/tmp/validated_customer_data.csv'
    
    except Exception as e:
        logging.error(f"Data validation error: {e}")
        raise

def transform_features(**kwargs):
    """특성 엔지니어링 함수"""
    try:
        input_path = kwargs['ti'].xcom_pull(task_ids='validate_data')
        logging.info(f"Transforming features from {input_path}")
        
        # 데이터 로드
        df = pd.read_csv(input_path)
        
        # 특성 엔지니어링
        # 1. 범주형 변수 원-핫 인코딩
        df = pd.get_dummies(df, columns=['contract_type', 'payment_method'], drop_first=True)
        
        # 2. 새로운 특성 생성
        df['avg_monthly_charge'] = df['total_charges'] / df['subscription_length']
        df['inactive_ratio'] = df['last_active_days'] / 30  # 활동 비율 계산
        
        # 3. 수치형 특성 정규화
        scaler = StandardScaler()
        numeric_features = ['subscription_length', 'monthly_charges', 'total_charges', 
                           'tech_support_calls', 'avg_monthly_charge']
        df[numeric_features] = scaler.fit_transform(df[numeric_features])
        
        # 4. 특성 선택 (중요한 특성만 유지)
        selected_columns = ['customer_id', 'subscription_length', 'monthly_charges', 
                           'tech_support_calls', 'last_active_days', 'inactive_ratio', 
                           'avg_monthly_charge'] + [col for col in df.columns if 'contract_type' in col 
                                                  or 'payment_method' in col]
        df = df[selected_columns]
        
        # 변환된 데이터 저장
        df.to_csv('/tmp/features_customer_data.csv', index=False)
        return '/tmp/features_customer_data.csv'
    
    except Exception as e:
        logging.error(f"Feature transformation error: {e}")
        raise

# DAG 태스크 정의
extract_task = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    provide_context=True,
    dag=dag,
)

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    provide_context=True,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_features',
    python_callable=transform_features,
    provide_context=True,
    dag=dag,
)

# 태스크 의존성 설정
extract_task >> validate_task >> transform_task
모델 개발 및 훈련
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# 모델 훈련 및 평가 예시 코드 (MLflow 사용)
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import mlflow
import mlflow.sklearn
import logging

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def load_train_data(path='/tmp/features_customer_data.csv', target_path='/tmp/customer_churn_labels.csv'):
    """훈련 데이터 및 레이블 로드"""
    try:
        # 특성 데이터 로드
        features = pd.read_csv(path)
        
        # 타겟 데이터 로드 (이탈 여부)
        # 실제 구현에서는 이 데이터가 별도 소스에서 올 수 있음
        targets = pd.read_csv(target_path)
        
        # 고객 ID로 데이터 병합
        data = features.merge(targets, on='customer_id')
        
        # ID 열 분리
        X = data.drop(['customer_id', 'churn'], axis=1)
        y = data['churn']
        
        return X, y
    
    except Exception as e:
        logger.error(f"Error loading training data: {e}")
        raise

def train_evaluate_model(X, y, model_type='random_forest', experiment_name='churn_prediction'):
    """모델 훈련 및 평가"""
    try:
        # MLflow 실험 설정
        mlflow.set_experiment(experiment_name)
        
        # 데이터 분할
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # MLflow 실행 시작
        with mlflow.start_run() as run:
            # 모델 선택
            if model_type == 'random_forest':
                model = RandomForestClassifier(n_estimators=100, random_state=42)
                params = {
                    'n_estimators': 100,
                    'max_depth': None,
                    'min_samples_split': 2,
                    'min_samples_leaf': 1
                }
            elif model_type == 'gradient_boosting':
                model = GradientBoostingClassifier(n_estimators=100, random_state=42)
                params = {
                    'n_estimators': 100,
                    'learning_rate': 0.1,
                    'max_depth': 3
                }
            elif model_type == 'logistic_regression':
                model = LogisticRegression(max_iter=1000, random_state=42)
                params = {
                    'C': 1.0,
                    'penalty': 'l2',
                    'solver': 'lbfgs'
                }
            else:
                raise ValueError(f"Unsupported model type: {model_type}")
            
            # 파라미터 로깅
            mlflow.log_params(params)
            
            # 모델 훈련
            model.fit(X_train, y_train)
            
            # 예측
            y_pred = model.predict(X_test)
            y_prob = model.predict_proba(X_test)[:, 1]
            
            # 평가 메트릭 계산
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred)
            recall = recall_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)
            roc_auc = roc_auc_score(y_test, y_prob)
            
            # 메트릭 로깅
            metrics = {
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1_score': f1,
                'roc_auc': roc_auc
            }
            mlflow.log_metrics(metrics)
            
            # 모델 저장
            mlflow.sklearn.log_model(model, "model")
            
            # 결과 출력
            logger.info(f"Model training completed with metrics: {metrics}")
            
            # 교차 검증 수행
            cv_scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
            mlflow.log_metric("cv_roc_auc_mean", cv_scores.mean())
            mlflow.log_metric("cv_roc_auc_std", cv_scores.std())
            
            logger.info(f"Cross-validation ROC AUC: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
            
            return {
                'model': model,
                'metrics': metrics,
                'run_id': run.info.run_id
            }
    
    except Exception as e:
        logger.error(f"Error in model training and evaluation: {e}")
        raise

def main():
    """메인 실행 함수"""
    try:
        # 데이터 로드
        logger.info("Loading training data...")
        X, y = load_train_data()
        
        # 여러 모델 학습 및 평가
        models = ['random_forest', 'gradient_boosting', 'logistic_regression']
        results = {}
        
        for model_type in models:
            logger.info(f"Training and evaluating {model_type} model...")
            results[model_type] = train_evaluate_model(X, y, model_type)
        
        # 최고 성능 모델 선택
        best_model_type = max(results, key=lambda k: results[k]['metrics']['roc_auc'])
        best_model_results = results[best_model_type]
        
        logger.info(f"Best model: {best_model_type} with ROC AUC: {best_model_results['metrics']['roc_auc']:.4f}")
        
        # 최고 성능 모델 등록 (모델 레지스트리 사용)
        mlflow.set_tracking_uri("http://localhost:5000")  # MLflow 서버 주소
        model_name = "customer_churn_predictor"
        model_version = mlflow.register_model(
            f"runs:/{best_model_results['run_id']}/model",
            model_name
        )
        
        logger.info(f"Registered model: {model_name} version {model_version.version}")
        
        return best_model_results
    
    except Exception as e:
        logger.error(f"Error in main function: {e}")
        raise

if __name__ == "__main__":
    main()
모델 배포 및 API 구현
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# FastAPI를 사용한 모델 서빙 예시 코드
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Optional
import pandas as pd
import numpy as np
import pickle
import json
import logging
from datetime import datetime
import mlflow.pyfunc
import redis
import os
from prometheus_client import Counter, Histogram, Gauge
import time

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Redis 연결 설정
redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", 6379)),
    db=0
)

# Prometheus 메트릭 정의
PREDICTION_COUNT = Counter('churn_predictions_total', 'Total number of churn predictions', ['result'])
PREDICTION_LATENCY = Histogram('churn_prediction_latency_seconds', 'Churn prediction latency in seconds')
MODEL_ACCURACY = Gauge('churn_model_accuracy', 'Current churn model accuracy')

# 기본 정확도 설정 (모니터링 시스템에서 주기적으로 업데이트됨)
MODEL_ACCURACY.set(0.85)

# FastAPI 앱 초기화
app = FastAPI(title="Customer Churn Prediction API")

# 요청 및 응답 모델
class CustomerFeatures(BaseModel):
    customer_id: str
    subscription_length: int
    monthly_charges: float
    total_charges: float
    contract_type: str
    payment_method: str
    tech_support_calls: int
    last_active_days: int

class PredictionResponse(BaseModel):
    customer_id: str
    churn_probability: float
    churn_risk_level: str
    model_version: str
    prediction_id: str
    prediction_time: str

# 모델 로드 함수
def load_model():
    """MLflow에서 최신 모델 로드"""
    try:
        # 모델 레지스트리에서 'Production' 단계의 모델 로드
        model = mlflow.pyfunc.load_model("models:/customer_churn_predictor/Production")
        logger.info("Successfully loaded churn prediction model")
        return model
    except Exception as e:
        logger.error(f"Error loading model: {e}")
        # 백업 방법: 로컬에 저장된 모델 파일 로드
        with open('models/churn_model_backup.pkl', 'rb') as f:
            model = pickle.load(f)
        logger.info("Loaded backup model")
        return model

# 의존성 주입을 위한 모델 제공자
def get_model():
    """싱글톤 패턴으로 모델 제공"""
    if not hasattr(get_model, "model"):
        get_model.model = load_model()
    return get_model.model

# 전처리 함수
def preprocess_features(customer: CustomerFeatures) -> pd.DataFrame:
    """요청 데이터 전처리"""
    # 데이터 프레임 변환
    data = pd.DataFrame([customer.dict()])
    
    # 1. 범주형 변수 처리
    contract_types = ['Monthly', 'One year', 'Two year']
    payment_methods = ['Credit card', 'Bank transfer', 'Electronic check']
    
    # 원-핫 인코딩
    for contract in contract_types[1:]:  # 첫 번째 카테고리는 기준으로 사용
        data[f'contract_type_{contract}'] = (data['contract_type'] == contract).astype(int)
    
    for method in payment_methods[1:]:
        data[f'payment_method_{method}'] = (data['payment_method'] == method).astype(int)
    
    # 2. 새로운 특성 생성
    data['avg_monthly_charge'] = data['total_charges'] / data['subscription_length']
    data['inactive_ratio'] = data['last_active_days'] / 30
    
    # 3. 원본 범주형 변수 제거
    data = data.drop(['contract_type', 'payment_method'], axis=1)
    
    # 고객 ID 별도 저장
    customer_id = data['customer_id']
    data = data.drop(['customer_id'], axis=1)
    
    return data, customer_id

# 예측 결과를 Redis에 캐싱하는 함수
def cache_prediction(customer_id: str, prediction: dict):
    """예측 결과를 Redis에 캐싱"""
    try:
        # 12시간 유효기간으로 캐싱
        redis_client.setex(
            f"churn_prediction:{customer_id}",
            43200,  # 12시간 (초 단위)
            json.dumps(prediction)
        )
    except Exception as e:
        logger.warning(f"Failed to cache prediction: {e}")

# 캐시된 예측 결과 조회 함수
def get_cached_prediction(customer_id: str) -> Optional[dict]:
    """Redis에서 캐시된 예측 결과 조회"""
    try:
        cached = redis_client.get(f"churn_prediction:{customer_id}")
        if cached:
            return json.loads(cached)
        return None
    except Exception as e:
        logger.warning(f"Failed to retrieve cached prediction: {e}")
        return None

# 마케팅 시스템에 알림 전송 함수 (백그라운드 작업)
def send_to_marketing_system(customer_id: str, churn_probability: float, risk_level: str):
    """고위험 고객 정보를 마케팅 시스템에 전송"""
    try:
        # 실제 구현에서는 외부 시스템 API 호출
        logger.info(f"Sending alert to marketing system for customer {customer_id}")
        # requests.post("https://marketing-api.example.com/churn-alerts",
        #     json={
        #         "customer_id": customer_id,
        #         "churn_probability": churn_probability,
        #         "risk_level": risk_level,
        #         "timestamp": datetime.now().isoformat()
        #     },
        #     headers={"Authorization": "Bearer " + API_KEY}
        # )
    except Exception as e:
        logger.error(f"Failed to send alert to marketing system: {e}")

@app.get("/")
def read_root():
    return {"message": "Customer Churn Prediction API"}

@app.get("/health")
def health_check():
    """헬스 체크 엔드포인트"""
    try:
        # 모델이 로드되었는지 확인
        model = get_model()
        # Redis 연결 확인
        redis_client.ping()
        return {"status": "healthy", "model_loaded": True, "redis_connected": True}
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return {"status": "unhealthy", "error": str(e)}, 503

@app.post("/predict", response_model=PredictionResponse)
async def predict_churn(
    customer: CustomerFeatures,
    background_tasks: BackgroundTasks,
    model=Depends(get_model)
):
    """고객 이탈 예측 엔드포인트"""
    prediction_start_time = time.time()
    
    try:
        # 1. 캐시된 결과 확인
        cached_prediction = get_cached_prediction(customer.customer_id)
        if cached_prediction:
            logger.info(f"Returning cached prediction for customer {customer.customer_id}")
            # 캐시된 결과를 반환하되, 예측 시간은 현재 시간으로 업데이트
            cached_prediction["prediction_time"] = datetime.now().isoformat()
            return PredictionResponse(**cached_prediction)
        
        # 2. 특성 전처리
        features, customer_id = preprocess_features(customer)
        
        # 3. 예측 수행
        churn_probability = model.predict(features)[0]
        
        # 4. 결과 후처리
        risk_level = "high" if churn_probability > 0.7 else "medium" if churn_probability > 0.3 else "low"
        
        # 5. 응답 생성
        prediction_id = f"pred_{int(time.time())}_{customer.customer_id}"
        response = {
            "customer_id": customer.customer_id,
            "churn_probability": float(churn_probability),
            "churn_risk_level": risk_level,
            "model_version": getattr(model, 'version', 'unknown'),
            "prediction_id": prediction_id,
            "prediction_time": datetime.now().isoformat()
        }
        
        # 6. 결과 캐싱
        background_tasks.add_task(cache_prediction, customer.customer_id, response)
        
        # 7. 고위험 고객 마케팅 알림
        if risk_level == "high":
            background_tasks.add_task(
                send_to_marketing_system,
                customer.customer_id,
                churn_probability,
                risk_level
            )
        
        # 8. 메트릭 업데이트
        prediction_label = "churn" if churn_probability > 0.5 else "no_churn"
        PREDICTION_COUNT.labels(result=prediction_label).inc()
        
        # 9. 응답 반환
        return PredictionResponse(**response)
    
    except Exception as e:
        logger.error(f"Prediction error: {e}")
        raise HTTPException(status_code=500, detail=str(e))
    
    finally:
        # 지연 시간 측정 및 기록
        prediction_time = time.time() - prediction_start_time
        PREDICTION_LATENCY.observe(prediction_time)

# 애플리케이션 시작 시 필요한 초기화
@app.on_event("startup")
async def startup_event():
    """애플리케이션 시작 시 실행되는 작업"""
    logger.info("Starting Churn Prediction API")
    # 모델 미리 로드
    get_model()

# 애플리케이션 종료 시 정리 작업
@app.on_event("shutdown")
async def shutdown_event():
    """애플리케이션 종료 시 실행되는 작업"""
    logger.info("Shutting down Churn Prediction API")
모니터링 및 재훈련 파이프라인
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# 모델 모니터링 및 재훈련 파이프라인 예시 코드
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from scipy.stats import ks_2samp
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
import logging
import time
from datetime import datetime, timedelta
import schedule
import requests
import json
import os

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# MLflow 설정
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
model_name = "customer_churn_predictor"

# 알림 설정
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
EMAIL_API_URL = os.getenv("EMAIL_API_URL")
EMAIL_API_KEY = os.getenv("EMAIL_API_KEY")
TEAM_EMAIL = "ai-team@example.com"

def load_reference_data():
    """기준 데이터 로드"""
    try:
        # 기준 데이터 경로
        reference_features_path = "data/reference/customer_features.csv"
        reference_predictions_path = "data/reference/model_predictions.csv"
        
        # 기준 특성 데이터 로드
        reference_features = pd.read_csv(reference_features_path)
        
        # 기준 예측 및 실제 결과 로드
        reference_predictions = pd.read_csv(reference_predictions_path)
        
        return reference_features, reference_predictions
    
    except Exception as e:
        logger.error(f"Error loading reference data: {e}")
        raise

def load_production_data(days=7):
    """최근 프로덕션 데이터 로드"""
    try:
        # 최근 7일간의 데이터 로드
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        # 실제 구현에서는 데이터베이스 쿼리 또는 로그 분석 시스템에서 데이터 추출
        # 예시를 위한 가상 데이터 경로
        production_data_path = f"data/production/customer_data_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.csv"
        
        # 프로덕션 데이터 로드
        production_data = pd.read_csv(production_data_path)
        
        return production_data
    
    except Exception as e:
        logger.error(f"Error loading production data: {e}")
        raise

def detect_data_drift(reference_data, production_data, threshold=0.05):
    """데이터 드리프트 감지"""
    try:
        drift_detected = False
        drift_features = []
        
        # 수치형 특성에 대해 Kolmogorov-Smirnov 테스트 수행
        numeric_features = ['subscription_length', 'monthly_charges', 'total_charges', 
                           'tech_support_calls', 'last_active_days']
        
        for feature in numeric_features:
            if feature in reference_data.columns and feature in production_data.columns:
                # KS 테스트로 두 분포의 차이 검정
                ks_stat, p_value = ks_2samp(reference_data[feature].dropna(), 
                                           production_data[feature].dropna())
                
                logger.info(f"KS test for {feature}: statistic={ks_stat:f}, p-value={p_value:f}")
                
                # p-value가 임계값보다 작으면 드리프트 감지
                if p_value < threshold:
                    drift_detected = True
                    drift_features.append({
                        'feature': feature,
                        'ks_statistic': ks_stat,
                        'p_value': p_value
                    })
        
        # 범주형 특성에 대한 분포 비교
        categorical_features = [col for col in reference_data.columns 
                               if col.startswith('contract_type_') or col.startswith('payment_method_')]
        
        for feature in categorical_features:
            if feature in reference_data.columns and feature in production_data.columns:
                # 범주 비율 계산
                ref_ratio = reference_data[feature].mean()
                prod_ratio = production_data[feature].mean()
                
                # 비율 차이가 임계값보다 크면 드리프트 감지
                if abs(ref_ratio - prod_ratio) > threshold:
                    drift_detected = True
                    drift_features.append({
                        'feature': feature,
                        'reference_ratio': ref_ratio,
                        'production_ratio': prod_ratio,
                        'difference': abs(ref_ratio - prod_ratio)
                    })
        
        return drift_detected, drift_features
    
    except Exception as e:
        logger.error(f"Error detecting data drift: {e}")
        raise

def evaluate_model_performance(predictions_df):
    """모델 성능 평가"""
    try:
        # 예측 및 실제 결과 추출
        y_true = predictions_df['actual_churn']
        y_pred = predictions_df['predicted_churn']
        y_prob = predictions_df['churn_probability']
        
        # 평가 메트릭 계산
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred),
            'recall': recall_score(y_true, y_pred),
            'f1_score': f1_score(y_true, y_pred),
            'roc_auc': roc_auc_score(y_true, y_prob)
        }
        
        logger.info(f"Model performance metrics: {metrics}")
        return metrics
    
    except Exception as e:
        logger.error(f"Error evaluating model performance: {e}")
        raise

def detect_performance_drift(baseline_metrics, current_metrics, threshold=0.05):
    """성능 드리프트 감지"""
    try:
        performance_drift = False
        drift_metrics = []
        
        for metric, value in current_metrics.items():
            if metric in baseline_metrics:
                diff = baseline_metrics[metric] - value
                
                # 성능 저하가 임계값보다 크면 드리프트 감지
                if diff > threshold:
                    performance_drift = True
                    drift_metrics.append({
                        'metric': metric,
                        'baseline': baseline_metrics[metric],
                        'current': value,
                        'difference': diff
                    })
        
        return performance_drift, drift_metrics
    
    except Exception as e:
        logger.error(f"Error detecting performance drift: {e}")
        raise

def send_alert(title, message, level='info'):
    """알림 전송"""
    try:
        # Slack 웹훅으로 알림 전송
        if SLACK_WEBHOOK_URL:
            slack_payload = {
                'text': f"*{title}*\n{message}",
                'username': 'Model Monitoring Bot',
                'icon_emoji': ':robot_face:' if level == 'info' else ':warning:'
            }
            
            requests.post(SLACK_WEBHOOK_URL, json=slack_payload)
        
        # 심각한 알림은 이메일로도 전송
        if level == 'critical' and EMAIL_API_URL and EMAIL_API_KEY:
            email_payload = {
                'to': TEAM_EMAIL,
                'subject': f"[URGENT] {title}",
                'body': message,
                'api_key': EMAIL_API_KEY
            }
            
            requests.post(EMAIL_API_URL, json=email_payload)
        
        logger.info(f"Alert sent: {title}")
    
    except Exception as e:
        logger.error(f"Error sending alert: {e}")

def trigger_model_retraining():
    """모델 재훈련 트리거"""
    try:
        # 재훈련 파이프라인 API 호출
        retraining_api_url = "http://airflow-api:8080/api/v1/dags/customer_churn_retraining/dagRuns"
        payload = {
            'conf': {'triggered_by': 'monitoring_system'}
        }
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f"Bearer {os.getenv('AIRFLOW_API_KEY')}"
        }
        
        response = requests.post(retraining_api_url, json=payload, headers=headers)
        response.raise_for_status()
        
        run_id = response.json()['dag_run_id']
        logger.info(f"Model retraining triggered successfully. Run ID: {run_id}")
        
        # 알림 전송
        send_alert(
            "Model Retraining Triggered",
            f"Customer churn model retraining has been triggered due to detected drift. Run ID: {run_id}"
        )
        
        return run_id
    
    except Exception as e:
        logger.error(f"Error triggering model retraining: {e}")
        send_alert(
            "Model Retraining Failed",
            f"Failed to trigger customer churn model retraining: {str(e)}",
            level='critical'
        )
        raise

def run_monitoring_pipeline():
    """모니터링 파이프라인 실행"""
    try:
        logger.info("Starting model monitoring pipeline…")
        
        # 1. 기준 데이터 로드
        reference_features, reference_predictions = load_reference_data()
        
        # 2. 프로덕션 데이터 로드
        production_data = load_production_data(days=7)
        
        # 3. 데이터 드리프트 감지
        data_drift_detected, drift_features = detect_data_drift(reference_features, production_data)
        
        if data_drift_detected:
            logger.warning("Data drift detected!")
            drift_message = "Data drift detected in the following features:\n"
            for feature in drift_features:
                drift_message += f"- {feature['feature']}\n"
            
            send_alert("Data Drift Detected", drift_message, level='warning')
        
        # 4. 현재 프로덕션 모델 성능 평가
        # 실제 구현에서는 라벨이 지연되어 도착할 수 있음을 고려해야 함
        production_predictions = pd.read_csv("data/production/recent_predictions_with_actuals.csv")
        current_metrics = evaluate_model_performance(production_predictions)
        
        # 5. 기준 성능 지표 로드
        latest_model_version = client.get_latest_versions(model_name, stages=["Production"])[0]
        run_id = latest_model_version.run_id
        run = client.get_run(run_id)
        baseline_metrics = {k: v for k, v in run.data.metrics.items() 
                           if k in ['accuracy', 'precision', 'recall', 'f1_score', 'roc_auc']}
        
        # 6. 성능 드리프트 감지
        performance_drift, drift_metrics = detect_performance_drift(baseline_metrics, current_metrics)
        
        if performance_drift:
            logger.warning("Performance drift detected!")
            drift_message = "Model performance drift detected:\n"
            for metric in drift_metrics:
                drift_message += f"- {metric['metric']}: {metric['baseline']:f}{metric['current']:f}{metric['difference']:f})\n"
            
            send_alert("Performance Drift Detected", drift_message, level='warning')
        
        # 7. 재훈련 필요 여부 결정
        retrain_threshold = 0.1  # 성능 10% 이상 저하 시 재훈련
        significant_performance_drift = any(m['difference'] > retrain_threshold for m in drift_metrics) if performance_drift else False
        
        if data_drift_detected and significant_performance_drift:
            logger.warning("Significant drift detected. Triggering model retraining…")
            trigger_model_retraining()
        elif data_drift_detected or significant_performance_drift:
            logger.info("Drift detected but not significant enough for automatic retraining.")
            send_alert(
                "Drift Detected - Manual Review Required",
                "Data or performance drift has been detected but is not significant enough for automatic retraining. Please review the monitoring dashboard.",
                level='info'
            )
        else:
            logger.info("No significant drift detected. Model is performing well.")
        
        # 8. 결과 저장
        monitoring_results = {
            'timestamp': datetime.now().isoformat(),
            'data_drift_detected': data_drift_detected,
            'performance_drift_detected': performance_drift,
            'current_metrics': current_metrics,
            'baseline_metrics': baseline_metrics,
            'retrain_triggered': data_drift_detected and significant_performance_drift
        }
        
        with open(f"logs/monitoring_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
            json.dump(monitoring_results, f, indent=2)
        
        logger.info("Model monitoring pipeline completed successfully.")
    
    except Exception as e:
        logger.error(f"Error in monitoring pipeline: {e}")
        send_alert(
            "Monitoring Pipeline Error",
            f"The model monitoring pipeline encountered an error: {str(e)}",
            level='critical'
        )
        raise

def schedule_monitoring():
    """모니터링 작업 스케줄링"""
    # 매일 자정에 모니터링 파이프라인 실행
    schedule.every().day.at("00:00").do(run_monitoring_pipeline)
    
    # 매주 월요일에 상세 보고서 생성
    # schedule.every().monday.at("09:00").do(generate_weekly_report)
    
    logger.info("Monitoring tasks scheduled.")
    
    while True:
        schedule.run_pending()
        time.sleep(60)  # 1분마다 스케줄 확인

if __name__ == "__main__":
    # 스크립트 직접 실행 시 즉시 실행 후 스케줄링
    run_monitoring_pipeline()
    schedule_monitoring()

실제 AI 엔지니어링 워크플로우

위에서 살펴본 고객 이탈 예측 시스템 예시를 통해 AI 엔지니어링의 전체 워크플로우를 이해할 수 있다.
이 워크플로우는 다음과 같은 단계로 구성된다:

  1. 문제 정의 및 범위 설정
    • 비즈니스 문제 이해 및 AI 접근 방식 결정
    • 성공 기준 및 제약 조건 설정
  2. 데이터 파이프라인 구축
    • 데이터 소스 연결 및 수집 자동화
    • 데이터 검증 및 전처리 단계 구현
    • 특성 엔지니어링 및 데이터 변환
  3. 모델 개발 및 실험
    • 다양한 모델 아키텍처 및 알고리즘 실험
    • 하이퍼파라미터 최적화 및 교차 검증
    • 실험 추적 및 버전 관리
  4. 모델 배포 인프라 구축
    • 모델 패키징 및 컨테이너화
    • API 설계 및 개발
    • 확장성 및 성능 최적화
  5. CI/CD 파이프라인 구축
    • 코드 및 모델 테스트 자동화
    • 지속적 통합 및 배포 워크플로우 설정
    • 환경별 설정 관리
  6. 모니터링 시스템 구축
    • 모델 성능 메트릭 정의 및 추적
    • 데이터 드리프트 감지 메커니즘 구현
    • 알림 및 보고 시스템 설정
  7. 피드백 루프 및 재훈련
    • 모델 성능 저하 감지 시 자동 재훈련
    • A/B 테스트를 통한 모델 업데이트 검증
    • 지속적인 모델 개선

AI 엔지니어링의 미래 동향

  1. 자동화 및 Low-Code/No-Code 솔루션

    • AutoML의 발전: 모델 선택, 하이퍼파라미터 최적화, 특성 엔지니어링의 자동화
    • ML 파이프라인 자동화: 데이터 준비부터 모델 배포까지 자동화된 파이프라인
    • Low-Code AI 플랫폼: 코딩 지식이 적은 사용자도 AI 시스템을 구축할 수 있는 도구
  2. 분산 및 연합 학습

    • 에지 AI: 에지 디바이스에서의 AI 모델 실행 및 학습
    • 연합 학습(Federated Learning): 중앙 데이터 저장소 없이 분산된 디바이스에서 학습
    • 프라이버시 보존 AI: 개인 정보를 보호하면서 학습 및 추론
  3. 생성형 AI 및 대규모 언어 모델

    • 생성형 AI 파이프라인: 텍스트, 이미지, 오디오 생성 모델의 개발 및 배포
    • LLM 응용 개발: 대규모 언어 모델을 활용한 애플리케이션 개발
    • 파인튜닝 및 추론 최적화: 대규모 모델의 효율적인 맞춤화 및 추론
  4. 책임 있는 AI 엔지니어링

    • AI 거버넌스: AI 시스템의 개발 및 배포를 관리하는 프레임워크
    • 공정성 및 편향 감지: AI 시스템의 공정성을 보장하기 위한 방법론
    • 설명 가능한 AI(XAI): AI 모델의 결정을 이해하고 설명하는 기법
    • 지속 가능한 AI: 환경적 영향을 최소화하는 AI 시스템 개발
  5. AI 시스템 보안

    • 적대적 공격 방어: 모델 강인성 향상 및 적대적 공격 방어
    • AI 모델 도난 방지: 모델 지적 재산권 보호
    • 데이터 누출 감지: 모델을 통한 훈련 데이터 유출 방지

참고 및 출처


Roadmap

Roadmap - AI Engineer


학습 로드맵

📘 이론적으로 꼭 알아야 할 내용 (Foundation for AI System Design)

영역내용설명
✅ AI 시스템 설계 원칙AI를 하나의 엔지니어링 시스템으로 접근하는 방법론성능, 확장성, 신뢰성, 유지보수성을 고려한 설계
✅ 머신러닝/딥러닝 기초지도학습, 비지도학습, 강화학습각 알고리즘의 구조, 한계, 적용 조건 등 이해
✅ 데이터 엔지니어링 기초ETL, 데이터 전처리, 피처 엔지니어링좋은 AI는 좋은 데이터에서 출발한다는 기본 원리
✅ 모델 서빙 및 배포 전략온라인/오프라인 서빙, REST API, ONNX 등모델을 실제 서비스에 적용하는 전략적 접근
✅ AI 윤리 및 안전성바이어스, 설명가능성, 책임 있는 AI신뢰받는 AI 시스템을 위한 필수 요소
✅ 소프트웨어 아키텍처 이론모듈화, 마이크로서비스, 이벤트 기반 구조AI 시스템의 확장성과 유지보수성 확보
✅ 실험 관리와 버전 관리실험 추적, 하이퍼파라미터 기록, 모델 버전 관리reproducibility 확보를 위한 원칙
✅ MLOps 원리DevOps + ML 모델 운영 관리CI/CD + 모델 학습/배포/모니터링 자동화 이론

📌 목적: AI를 “개발 가능한 시스템"으로 설계할 수 있는 기초 확보

🛠️ 실무적으로 꼭 알아야 할 내용 (Implementation & Operation)

영역내용설명
✅ Python 기반 ML 실습scikit-learn, PyTorch, TensorFlow 등모델 학습/테스트/튜닝 실습 역량
✅ 데이터 파이프라인 구축Pandas, SQL, Airflow, Spark데이터 수집/가공/저장 자동화 구축
✅ 모델 배포 실습FastAPI, Docker, TorchServeRESTful API 기반 실시간 모델 서빙 구현
✅ 실험 관리 도구 사용MLflow, Weights & Biases실험/버전 관리, 성능 트래킹 능력 확보
✅ MLOps 파이프라인 구성Kubeflow, SageMaker, Vertex AI 등모델 학습/배포/모니터링 자동화 실습
✅ CI/CD 적용GitHub Actions, Jenkins, Docker Compose코드/모델의 배포 자동화 및 테스트 연동
✅ 모니터링 & 알림 시스템Prometheus, Grafana, 로그 추적모델 드리프트 감지 및 운영 안정성 확보
✅ API 연동 및 시스템 통합모델 결과를 프론트엔드/백엔드와 통합실제 제품 서비스에 모델 내장하는 실무 능력

📌 목적: 모델 개발 → 배포 → 운영까지 엔드 투 엔드로 시스템을 구현하는 역량 확보

📐 이론 ↔ 실무 연결 예시

이론 요소실무 연결
MLOps 원리Kubeflow + MLflow를 활용한 자동 학습/배포 파이프라인 구성
모델 서빙 전략Dockerized REST API로 모델 서빙 후, API Gateway로 통합
AI 윤리Explainable AI 도구(LIME, SHAP)를 사용해 판단 근거 시각화
데이터 엔지니어링Airflow + Spark로 대용량 데이터 파이프라인 구성
실험 관리W&B를 통해 실험 기록, 비교, 시각화 자동화 적용

🧭 추천 학습 순서 (IT 개발자 입장 기준)

  1. AI 엔지니어링 개요 및 구성요소 파악
    (설계 → 개발 → 배포 → 운영 전체 흐름 이해)

  2. 기초 ML 알고리즘 & Python 기반 모델 구현 학습
    (scikit-learn → PyTorch 실습 중심)

  3. 데이터 파이프라인 및 전처리 자동화 학습

  4. FastAPI + Docker 기반 모델 서빙 구조 실습

  5. MLflow/W&B 등 실험 관리 적용 및 버전 관리

  6. MLOps 구성 및 CI/CD 적용

  7. 모니터링 & 안전성 도입 → AI 시스템 통합 운영

🏁 학습의 궁극적인 목적

“AI 모델"을 만드는 것을 넘어서,
**“현실 문제를 해결할 수 있는 완전한 AI 제품을 만들고 운영할 수 있는 개발자”**가 되는 것