Transaction Log Tailing

Transaction log tailing은 마이크로서비스 아키텍처(MSA)에서 Transactional Messaging을 구현하는 방법 중 하나이다.
이 패턴은 데이터베이스의 트랜잭션 로그를 실시간으로 읽어 변경사항을 메시지 브로커에 발행하는 방식이다.

Transaction log tailing은 Polling publisher 방식과 비교될 수 있다. Polling은 주기적으로 데이터베이스를 조회하는 반면, log tailing은 실시간으로 변경사항을 감지한다. 이로 인해 log tailing이 더 빠르고 효율적이지만, 구현이 더 복잡할 수 있다.

이 패턴을 사용할 때는 메시지의 중복 발행 가능성을 고려해야 하며, 소비자 측에서 멱등성을 보장하는 방식으로 구현해야 한다.

작동 방식

  • 데이터베이스의 트랜잭션 로그를 지속적으로 모니터링한다.
  • 로그에서 변경사항을 감지하면 이를 메시지로 변환한다.
  • 변환된 메시지를 메시지 브로커(예: Kafka)에 발행한다.

구현

  • 데이터베이스 특화 솔루션이 필요하다 (예: MySQL binlog, Postgres WAL).
  • Debezium, LinkedIn Databus, DynamoDB Streams 등의 도구를 사용할 수 있다.

장점

  • 실시간 데이터 동기화가 가능하다.
  • 데이터베이스에 추가적인 부하를 주지 않는다.
  • 높은 확장성을 제공한다.

단점

  • 구현이 복잡할 수 있다.
  • 데이터베이스 종속적인 솔루션이 필요하다.
  • 운영 복잡성이 증가할 수 있다.

사용 사례

  • 실시간 데이터 동기화가 필요한 경우
  • 대규모 트랜잭션 처리 시스템
  • 이벤트 소싱 아키텍처

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class LogBasedMessagePublisher:
    def __init__(self, config):
        self.db_connector = DatabaseConnector(config['database'])
        self.message_broker = MessageBroker(config['broker'])
        self.position_store = PositionStore()
        self.change_processors = {}
        
    async def process_log_entry(self, log_entry):
        """로그 엔트리를 처리하고 메시지 발행"""
        change_type = self.identify_change_type(log_entry)
        
        if change_type in self.change_processors:
            try:
                # 변경 내용을 메시지로 변환
                message = await self.change_processors[change_type](log_entry)
                
                # 메시지 발행
                await self.publish_message(message)
                
                # 처리 위치 저장
                await self.position_store.save_position(log_entry.position)
                
            except Exception as e:
                await self.handle_processing_error(e, log_entry)
    
    def identify_change_type(self, log_entry):
        """로그 엔트리의 변경 유형 식별"""
        # 트랜잭션 로그 포맷에 따른 변경 유형 파악
        if log_entry.operation == 'INSERT':
            return 'insert'
        elif log_entry.operation == 'UPDATE':
            return 'update'
        elif log_entry.operation == 'DELETE':
            return 'delete'

참고 및 출처