기아 상태 (Starvation)

운영 체제 및 동시성 프로그래밍에서 중요한 문제로, 특정 프로세스가 필요한 자원을 지속적으로 얻지 못해 실행되지 못하는 상황.
자원 관리 문제로, 낮은 우선순위 프로세스가 높은 우선순위 프로세스에 의해 자원이 계속 점유되어 무기한 대기하는 상황으로 주로 우선순위 기반 스케줄링에서 발생하며, 시스템 성능과 공정성에 부정적인 영향을 미친다.

Starvation in Operating System
Source: https://www.javatpoint.com/what-is-starvation-in-operating-system

발생 조건

기아 상태가 발생하기 위한 주요 조건은 다음과 같다:

  1. 우선순위 기반 스케줄링: 높은 우선순위 프로세스가 계속 실행되면서 낮은 우선순위 프로세스가 실행되지 못함.
  2. 자원 부족: 시스템 자원이 제한적일 때 특정 프로세스가 지속적으로 자원을 얻지 못함.
  3. 비공정한 스케줄링 알고리즘: 공정성을 고려하지 않는 알고리즘이 낮은 우선순위 프로세스를 무시함.
  4. 임계 구역 점유: 특정 프로세스가 임계 구역을 오래 점유하여 다른 프로세스의 접근을 차단.

해결책 및 방지책

기아 상태를 해결하거나 방지하기 위한 방법은 다음과 같다:

  1. Aging(에이징) 기법:

    • 대기 시간이 길어질수록 프로세스의 우선순위를 점진적으로 증가시켜 실행 기회를 보장한다.
    • 예: 일정 시간마다 대기 중인 프로세스의 우선순위를 1씩 증가.
  2. 라운드 로빈 스케줄링:

    • 모든 프로세스에 고정된 시간 슬라이스를 할당하여 공정하게 CPU 시간을 분배한다.
  3. 공정 스케줄러(Fair Scheduler):

    • 자원의 공정한 분배를 보장하는 알고리즘 사용.
    • 예: Completely Fair Scheduler(CFS).
  4. 우선순위 부스트(Priority Boosting):

    • 오랫동안 대기한 프로세스의 우선순위를 일시적으로 높여 실행 가능성을 증가.
  5. 자원 관리 개선:

    • 세마포어, 뮤텍스 등 동기화 메커니즘을 사용하여 자원의 공정한 사용 보장.
  6. 랜덤 선택 회피:

    • 랜덤으로 프로세스를 선택하지 않고 공정성을 고려한 알고리즘 사용.

실제 시스템에서의 예방책

  1. Aging 적용: 오래 대기한 프로세스의 우선순위를 자동으로 조정.
  2. 멀티레벨 피드백 큐(Multilevel Feedback Queue): 대기 시간이 긴 프로세스를 더 높은 우선순위 큐로 이동.
  3. 자원 모니터링 도구 사용: 시스템 자원의 사용 현황을 실시간으로 추적하여 기아 상태를 사전에 탐지.
  4. 공정성 테스트: 다양한 워크로드 시나리오에서 스케줄링 알고리즘을 테스트하여 공정성을 검증.

고려사항 및 주의사항

  1. 성능 저하: Aging이나 Priority Boosting은 성능에 영향을 미칠 수 있으므로 적절히 조율해야 한다.
  2. 데드락과 구분: 기아 상태와 데드락은 다른 문제로, 데드락은 모든 프로세스가 멈추는 반면 기아 상태는 일부만 영향을 받음.
  3. 우선순위 역전(Priority Inversion): 낮은 우선순위 작업이 높은 우선순위 작업을 차단하지 않도록 주의해야 함.

모범 사례

  1. Aging 기법을 기본적으로 적용하여 모든 프로세스에 실행 기회를 제공.
  2. 라운드 로빈과 같은 공정한 스케줄링 정책 채택.
  3. 시스템 로그와 모니터링 도구를 활용하여 기아 상태를 조기에 탐지.

구현 예시

기아 상태를 시뮬레이션하고 이를 해결하는 파이썬 예제

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import threading
import queue
import time
from dataclasses import dataclass
from typing import Optional
import random

@dataclass
class Task:
    id: int
    priority: int
    creation_time: float
    wait_time: float = 0
    
    def update_wait_time(self):
        self.wait_time = time.time() - self.creation_time
        
    def should_boost_priority(self) -> bool:
        """우선순위 부스팅이 필요한지 확인"""
        return self.wait_time > 10  # 10초 이상 대기시 부스팅

class FairScheduler:
    def __init__(self):
        self.high_priority_queue = queue.PriorityQueue()
        self.low_priority_queue = queue.PriorityQueue()
        self.lock = threading.Lock()
        self.running = True
        
    def add_task(self, task: Task):
        """작업 추가"""
        with self.lock:
            if task.priority > 5:
                self.high_priority_queue.put((-task.priority, task))
            else:
                self.low_priority_queue.put((-task.priority, task))
                
    def process_tasks(self):
        """작업 처리"""
        while self.running:
            next_task = self._get_next_task()
            if next_task:
                self._execute_task(next_task)
                
    def _get_next_task(self) -> Optional[Task]:
        """다음 처리할 작업 선택"""
        with self.lock:
            # 모든 낮은 우선순위 작업 검사 및 부스팅
            low_priority_tasks = []
            while not self.low_priority_queue.empty():
                _, task = self.low_priority_queue.get()
                task.update_wait_time()
                if task.should_boost_priority():
                    print(f"Boosting priority of task {task.id}")
                    task.priority = 10  # 우선순위 부스팅
                    self.high_priority_queue.put((-task.priority, task))
                else:
                    low_priority_tasks.append((-task.priority, task))
                    
            # 낮은 우선순위 작업 다시 큐에 넣기
            for task_tuple in low_priority_tasks:
                self.low_priority_queue.put(task_tuple)
                
            # 높은 우선순위 큐에서 먼저 확인
            if not self.high_priority_queue.empty():
                return self.high_priority_queue.get()[1]
                
            # 낮은 우선순위 큐에서 확인
            if not self.low_priority_queue.empty():
                return self.low_priority_queue.get()[1]
                
            return None
            
    def _execute_task(self, task: Task):
        """작업 실행"""
        print(f"Executing task {task.id} (Priority: {task.priority}, "
              f"Wait time: {task.wait_time:f}s)")
        # 작업 실행 시뮬레이션
        time.sleep(random.uniform(0.1, 0.5))

def simulate_starvation():
    """기아 상태 시뮬레이션"""
    scheduler = FairScheduler()
    
    # 스케줄러 실행 스레드
    scheduler_thread = threading.Thread(target=scheduler.process_tasks)
    scheduler_thread.start()
    
    # 높은 우선순위 작업 지속적 생성
    def generate_high_priority_tasks():
        task_id = 0
        while scheduler.running:
            task = Task(id=task_id, 
                       priority=random.randint(7, 10),
                       creation_time=time.time())
            scheduler.add_task(task)
            task_id += 1
            time.sleep(random.uniform(0.1, 0.3))
            
    # 낮은 우선순위 작업 생성
    def generate_low_priority_tasks():
        task_id = 1000  # 구분을 위한 시작 ID
        while scheduler.running:
            task = Task(id=task_id,
                       priority=random.randint(1, 4),
                       creation_time=time.time())
            scheduler.add_task(task)
            task_id += 1
            time.sleep(random.uniform(0.5, 1.0))
    
    # 작업 생성 스레드 시작
    high_priority_generator = threading.Thread(target=generate_high_priority_tasks)
    low_priority_generator = threading.Thread(target=generate_low_priority_tasks)
    
    high_priority_generator.start()
    low_priority_generator.start()
    
    # 일정 시간 후 시뮬레이션 종료
    time.sleep(30)
    scheduler.running = False
    
    # 모든 스레드 종료 대기
    scheduler_thread.join()
    high_priority_generator.join()
    low_priority_generator.join()

if __name__ == "__main__":
    simulate_starvation()

실제 시스템에서의 해결 전략

  1. Aging 통합: 운영 체제 수준에서 Aging 알고리즘을 기본 스케줄러에 통합.
  2. 동적 우선순위 조정: 워크로드와 대기 시간에 따라 실시간으로 우선순위를 조정.
  3. 자원 관리 정책 개선: 세마포어, 뮤텍스 등 동기화 메커니즘을 효율적으로 설계.
  4. 모니터링 및 알림 시스템 구축: 기아 상태 발생 가능성을 실시간으로 감지하고 관리자에게 알림.

참고 및 출처