1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
| import threading
from abc import ABC, abstractmethod
from queue import Queue
from typing import List
import time
import random
# 작업을 정의하는 기본 클래스
class Task:
def __init__(self, task_id: int, data: List[int]):
self.task_id = task_id
self.data = data
self.result = None
# 슬레이브의 추상 클래스
class Slave(ABC):
def __init__(self, slave_id: int):
self.slave_id = slave_id
self.is_busy = False
@abstractmethod
def process_task(self, task: Task) -> None:
pass
# 구체적인 슬레이브 구현 - 숫자 배열의 합을 계산
class SumCalculatorSlave(Slave):
def process_task(self, task: Task) -> None:
print(f"Slave {self.slave_id} starting task {task.task_id}")
# 실제 작업 처리를 시뮬레이션하기 위한 지연
time.sleep(random.uniform(0.5, 2.0))
task.result = sum(task.data)
print(f"Slave {self.slave_id} completed task {task.task_id}, result: {task.result}")
# 마스터 클래스
class Master:
def __init__(self, num_slaves: int):
# 슬레이브 풀 초기화
self.slaves = [SumCalculatorSlave(i) for i in range(num_slaves)]
# 작업 큐
self.task_queue = Queue()
# 완료된 작업 저장
self.completed_tasks = {}
# 작업 분배를 위한 쓰레드
self.distribution_thread = threading.Thread(target=self._distribute_tasks)
self.is_running = True
def start(self):
"""마스터 시작"""
print("Master starting…")
self.distribution_thread.start()
def stop(self):
"""마스터 종료"""
print("Master stopping…")
self.is_running = False
self.distribution_thread.join()
def submit_task(self, task: Task):
"""새로운 작업 제출"""
print(f"Submitting task {task.task_id}")
self.task_queue.put(task)
def get_result(self, task_id: int) -> int:
"""작업 결과 조회"""
while task_id not in self.completed_tasks:
time.sleep(0.1) # 결과가 준비될 때까지 대기
return self.completed_tasks[task_id]
def _distribute_tasks(self):
"""작업 분배 로직"""
while self.is_running:
try:
# 대기 중인 작업이 있는지 확인
task = self.task_queue.get(timeout=1.0)
# 사용 가능한 슬레이브 찾기
slave = self._get_available_slave()
if slave:
# 작업 처리를 위한 새 쓰레드 시작
threading.Thread(
target=self._process_task_with_slave,
args=(slave, task)
).start()
except Queue.Empty:
continue
def _get_available_slave(self) -> Slave:
"""사용 가능한 슬레이브 찾기"""
for slave in self.slaves:
if not slave.is_busy:
return slave
return None
def _process_task_with_slave(self, slave: Slave, task: Task):
"""슬레이브를 사용하여 작업 처리"""
try:
slave.is_busy = True
slave.process_task(task)
self.completed_tasks[task.task_id] = task.result
finally:
slave.is_busy = False
# 사용 예시
def main():
# 3개의 슬레이브로 마스터 생성
master = Master(num_slaves=3)
master.start()
try:
# 여러 작업 제출
tasks = [
Task(1, [1, 2, 3, 4, 5]),
Task(2, [10, 20, 30, 40, 50]),
Task(3, [100, 200, 300, 400, 500]),
Task(4, [1000, 2000, 3000, 4000, 5000])
]
# 작업 제출
for task in tasks:
master.submit_task(task)
# 결과 수집
for task in tasks:
result = master.get_result(task.task_id)
print(f"Final result for task {task.task_id}: {result}")
# 잠시 대기 후 종료
time.sleep(5)
finally:
master.stop()
if __name__ == "__main__":
main()
|