fromtypingimportDict,List,Callablefromdataclassesimportdataclassfromdatetimeimportdatetimeimportthreadingimportqueueimportuuid# 이벤트 기본 클래스@dataclassclassEvent:event_type:strtimestamp:datetimedata:dictevent_id:str=str(uuid.uuid4())# 이벤트 버스 구현classEventBus:def__init__(self):# 이벤트 타입별 구독자 목록self._subscribers:Dict[str,List[Callable]]={}# 이벤트 큐self._event_queue=queue.Queue()# 이벤트 처리 스레드self._processing_thread=threading.Thread(target=self._process_events)self._is_running=Trueself._processing_thread.start()defsubscribe(self,event_type:str,handler:Callable):"""이벤트 타입에 대한 핸들러 등록"""ifevent_typenotinself._subscribers:self._subscribers[event_type]=[]self._subscribers[event_type].append(handler)defpublish(self,event:Event):"""이벤트 발행"""self._event_queue.put(event)def_process_events(self):"""이벤트 처리 루프"""whileself._is_running:try:event=self._event_queue.get(timeout=1.0)self._dispatch_event(event)exceptqueue.Empty:continuedef_dispatch_event(self,event:Event):"""등록된 핸들러들에게 이벤트 전달"""ifevent.event_typeinself._subscribers:forhandlerinself._subscribers[event.event_type]:try:handler(event)exceptExceptionase:print(f"Error handling event {event.event_id}: {e}")defshutdown(self):"""이벤트 버스 종료"""self._is_running=Falseself._processing_thread.join()# 예시: 주문 시스템에서의 사용classOrderService:def__init__(self,event_bus:EventBus):self.event_bus=event_busdefplace_order(self,order_data:dict):# 주문 처리 로직print(f"Processing order: {order_data}")# 주문 완료 이벤트 발행event=Event(event_type="OrderPlaced",timestamp=datetime.now(),data=order_data)self.event_bus.publish(event)classInventoryService:def__init__(self,event_bus:EventBus):self.event_bus=event_bus# OrderPlaced 이벤트 구독self.event_bus.subscribe("OrderPlaced",self.handle_order_placed)defhandle_order_placed(self,event:Event):print(f"Updating inventory for order: {event.data}")# 재고 업데이트 로직classNotificationService:def__init__(self,event_bus:EventBus):self.event_bus=event_bus# OrderPlaced 이벤트 구독self.event_bus.subscribe("OrderPlaced",self.handle_order_placed)defhandle_order_placed(self,event:Event):print(f"Sending notification for order: {event.data}")# 알림 발송 로직# 사용 예시if__name__=="__main__":event_bus=EventBus()order_service=OrderService(event_bus)inventory_service=InventoryService(event_bus)notification_service=NotificationService(event_bus)# 주문 처리order_service.place_order({"order_id":"123","product":"laptop"})