비결정성 (Non-determinism)

알고리즘이나 시스템에서 동일한 입력에 대해 매번 다른 과정을 거쳐 다른 결과를 도출할 수 있는 특성

특징

  1. 다중 선택: 각 단계에서 여러 가능한 다음 단계 중 하나를 임의로 선택할 수 있다.
  2. 병렬 처리: 여러 가능한 경로를 동시에 탐색할 수 있는 개념적 모델을 제공한다.
  3. 결정성과의 차이: 결정성 알고리즘은 각 단계에서 다음 단계가 유일하게 결정되는 반면, 비결정성 알고리즘은 그렇지 않다.

비결정성 알고리즘

비결정성 알고리즘은 다음과 같은 특징을 가진다.

  1. 실행 경로의 다양성: 동일한 입력에 대해 여러 가능한 실행 경로가 존재한다.
  2. 비결정도: 각 단계에서 선택 가능한 다음 단계의 최대 개수를 비결정도라고 한다.
  3. 계산 능력: 비결정성 알고리즘과 결정성 알고리즘의 계산 능력은 동일하다.

응용

  1. NP 문제: 비결정성 알고리즘으로 다항식 시간 내에 해결 가능한 결정형 문제를 NP 문제라고 한다.
  2. 유한 오토마타: 비결정적 유한 오토마타(NFA)는 탐색과 백트래킹 기법을 통해 모든 가능한 선택을 시도한다.
  3. 탐색 및 백트래킹 알고리즘: 비결정성은 여러 가지 경우를 순차적으로 계산하며 최적값을 갱신하는 백트래킹 기법의 모델로 사용된다.

장점

  1. 간결한 표현: 복잡한 언어나 시스템을 비결정성을 통해 더 간결하게 정의할 수 있다.
  2. 논증 간소화: 비결정성을 통해 공식적인 논증을 간단히 할 수 있다.
  3. 모델링 유연성: 실제 세계의 불확실성이나 복잡성을 모델링하는 데 유용하다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import random
import threading

# 결정적인 함수의 예
def deterministic_sum(a, b):
    return a + b  # 항상 같은 입력에 대해 같은 결과

# 비결정적인 함수의 예
def non_deterministic_choice(options):
    return random.choice(options)  # 매번 다른 결과가 나올 수 있음

# 비결정적인 멀티스레딩 예제
shared_counter = 0
lock = threading.Lock()

def increment_counter():
    global shared_counter
    current = shared_counter
    # 의도적으로 경쟁 조건을 만듦
    threading.Thread(target=lambda: None).start()
    shared_counter = current + 1

def run_concurrent_increments(n):
    threads = []
    for _ in range(n):
        t = threading.Thread(target=increment_counter)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    return shared_counter

다양한 상황에서 발생할 수 있다:

  1. 병렬 처리와 동시성
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 동시성으로 인한 비결정적 결과
def parallel_processing_example():
    results = []
    
    def worker():
        # 시간이 걸리는 작업 시뮬레이션
        time.sleep(random.random())
        results.append(threading.current_thread().name)
    
    threads = [
        threading.Thread(target=worker)
        for _ in range(5)
    ]
    
    for t in threads:
        t.start()
    
    for t in threads:
        t.join()
        
    return results  # 스레드 완료 순서가 매번 다를 수 있음
  1. 네트워크 통신
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def fetch_data(url):
    try:
        # 네트워크 지연 시뮬레이션
        await asyncio.sleep(random.random())
        return f"Data from {url}"
    except:
        return None

async def fetch_multiple():
    urls = ['url1', 'url2', 'url3']
    tasks = [fetch_data(url) for url in urls]
    # 완료 순서가 비결정적
    return await asyncio.gather(*tasks)
  1. 리소스 경쟁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class SharedResource:
    def __init__(self):
        self.data = []
        self.lock = threading.Lock()
    
    def add_data(self, item):
        with self.lock:
            # 경쟁 상태 시뮬레이션
            current = self.data.copy()
            time.sleep(random.random() * 0.1)
            current.append(item)
            self.data = current

비결정성을 다루는 전략들

  1. 동기화 메커니즘 사용:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class DeterministicCounter:
    def __init__(self):
        self._count = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._count += 1
            
    def get_count(self):
        with self._lock:
            return self._count
  1. 시드(Seed) 설정으로 재현 가능한 무작위성 구현:
1
2
3
4
5
def reproducible_random_sequence(seed=42):
    random.seed(seed)
    results = [random.randint(1, 100) for _ in range(10)]
    random.seed()  # 시드 초기화
    return results
  1. 테스트를 위한 결정적 동작 모드:
1
2
3
4
5
6
7
8
class TestableSystem:
    def __init__(self, deterministic_mode=False):
        self.deterministic_mode = deterministic_mode
        
    def get_random_value(self):
        if self.deterministic_mode:
            return 42  # 테스트용 고정값
        return random.randint(1, 100)

문제가 되는 상황들과 그 해결 방안

  1. 분산 시스템에서의 시간 동기화
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import time

class LogicalClock:
    def __init__(self):
        self.timestamp = 0
        self.lock = threading.Lock()
    
    def get_time(self):
        with self.lock:
            self.timestamp += 1
            return self.timestamp
    
    def update(self, received_time):
        with self.lock:
            self.timestamp = max(self.timestamp, received_time) + 1
  1. 데이터베이스 트랜잭션
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class TransactionManager:  
    def __init__(self):  
        self.locks = {}
        
    def begin_transaction(self):
        transaction_id = str(uuid.uuid4())
        self.locks[transaction_id] = set()
        return transaction_id
        
    def acquire_lock(self, transaction_id, resource_id):
        if resource_id in self.locks[transaction_id]:
            return True
        # 2단계 락킹 프로토콜 구현
        self.locks[transaction_id].add(resource_id)
        return True

고려사항

  • 분산 시스템 설계
  • 병렬 알고리즘 구현
  • 네트워크 프로토콜 개발
  • 멀티스레드 프로그래밍
  • 암호화 시스템 구현

참고 및 출처