importthreadingimporttimefromdatetimeimportdatetimefromtypingimportDict,Optionalfromdataclassesimportdataclass@dataclassclassUserProfile:user_id:intname:strlast_updated:datetimeversion:intclassUserProfileService:def__init__(self):# 메인 저장소self._storage:Dict[int,UserProfile]={}# 캐시self._cache:Dict[int,UserProfile]={}self._lock=threading.Lock()defget_profile(self,user_id:int)->Optional[UserProfile]:"""
사용자 프로필을 조회하는 메서드
캐시 확인 후 없으면 저장소에서 조회
"""# 캐시에서 먼저 확인cached_profile=self._cache.get(user_id)ifcached_profile:returncached_profile# 저장소에서 조회withself._lock:profile=self._storage.get(user_id)ifprofile:# 캐시 업데이트self._cache[user_id]=profilereturnprofiledefupdate_profile(self,user_id:int,name:str)->UserProfile:"""
사용자 프로필을 업데이트하는 메서드
버전 관리를 통한 동시성 제어
"""withself._lock:current_profile=self._storage.get(user_id)new_version=(current_profile.version+1)ifcurrent_profileelse1# 새 프로필 생성updated_profile=UserProfile(user_id=user_id,name=name,last_updated=datetime.now(),version=new_version)# 저장소 업데이트self._storage[user_id]=updated_profile# 캐시 무효화ifuser_idinself._cache:delself._cache[user_id]returnupdated_profiledefsimulate_concurrent_updates():"""
동시성 업데이트 시뮬레이션
"""service=UserProfileService()defupdate_worker(user_id:int,name:str):try:profile=service.update_profile(user_id,name)print(f"Updated profile: {profile}")exceptExceptionase:print(f"Error updating profile: {e}")# 여러 스레드에서 동시에 업데이트 시도threads=[]foriinrange(5):t=threading.Thread(target=update_worker,args=(1,f"User_{i}"))threads.append(t)t.start()# 모든 스레드 완료 대기fortinthreads:t.join()# 최종 상태 확인final_profile=service.get_profile(1)print(f"\nFinal profile state: {final_profile}")if__name__=="__main__":simulate_concurrent_updates()
fromsqlalchemyimportcreate_engine,Column,Integer,String,DateTimefromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportsessionmakerfromsqlalchemy.sqlimporttextBase=declarative_base()classUser(Base):__tablename__='users'id=Column(Integer,primary_key=True)name=Column(String)version=Column(Integer,default=1)last_updated=Column(DateTime)__table_args__={'postgresql_partition_by':'RANGE (last_updated)'}defupdate_with_optimistic_locking(session,user_id,new_name,expected_version):"""
낙관적 락킹을 사용한 업데이트
"""result=session.execute(text("""
UPDATE users
SET name = :name,
version = :new_version,
last_updated = NOW()
WHERE id = :user_id
AND version = :expected_version
"""),{'name':new_name,'new_version':expected_version+1,'user_id':user_id,'expected_version':expected_version})ifresult.rowcount==0:raiseConcurrentModificationError("데이터가 이미 수정되었습니다")
importredisfromfunctoolsimportwrapsclassCacheManager:def__init__(self):self.redis_client=redis.Redis(host='localhost',port=6379)defcache_with_version(self,key_prefix,timeout=300):"""
버전 관리가 포함된 캐시 데코레이터
"""defdecorator(f):@wraps(f)defwrapper(*args,**kwargs):cache_key=f"{key_prefix}:{args[0]}"version_key=f"{cache_key}:version"# 캐시된 버전 확인cached_version=self.redis_client.get(version_key)ifcached_version:cached_data=self.redis_client.get(cache_key)ifcached_data:returncached_data# 새로운 데이터 가져오기data=f(*args,**kwargs)# 캐시 업데이트pipe=self.redis_client.pipeline()pipe.set(cache_key,data,ex=timeout)pipe.incr(version_key)pipe.execute()returndatareturnwrapperreturndecorator