importthreadingimporttimeclassResource:def__init__(self,name):self.name=nameself.lock=threading.Lock()defacquire(self,process_name):print(f"{process_name}가 {self.name} 획득 시도")self.lock.acquire()print(f"{process_name}가 {self.name} 획득 성공")defrelease(self,process_name):print(f"{process_name}가 {self.name} 반환")self.lock.release()defprocess_task(process_name,first_resource,second_resource):"""
교착상태를 발생시키는 프로세스 작업을 시뮬레이션합니다.
각 프로세스는 두 개의 자원을 순차적으로 획득하려 시도합니다.
"""try:# 첫 번째 자원 획득first_resource.acquire(process_name)print(f"{process_name}가 작업 중…")time.sleep(1)# 다른 프로세스가 두 번째 자원을 획득할 시간을 줌# 두 번째 자원 획득 시도second_resource.acquire(process_name)print(f"{process_name}가 모든 자원 획득 성공")# 작업 수행time.sleep(1)# 자원 반환second_resource.release(process_name)first_resource.release(process_name)exceptExceptionase:print(f"{process_name} 오류 발생: {e}")defmain():# 두 개의 자원 생성resource_A=Resource("Resource A")resource_B=Resource("Resource B")# 두 개의 프로세스 생성# Process 1은 A -> B 순서로 자원 획득 시도# Process 2는 B -> A 순서로 자원 획득 시도process1=threading.Thread(target=process_task,args=("Process 1",resource_A,resource_B))process2=threading.Thread(target=process_task,args=("Process 2",resource_B,resource_A))# 프로세스 시작process1.start()process2.start()# 프로세스 종료 대기process1.join()process2.join()if__name__=="__main__":print("교착상태 시뮬레이션 시작")main()print("시뮬레이션 종료")